AccessionUploadController.java
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.api.v1;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.PersistenceException;
import javax.persistence.RollbackException;
import javax.validation.ValidationException;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.server.api.ApiBaseController;
import org.genesys.server.api.PleaseRetryException;
import org.genesys.server.exception.InvalidApiUsageException;
import org.genesys.server.exception.NotFoundElement;
import org.genesys.server.model.impl.FaoInstitute;
import org.genesys.server.service.InstituteService;
import org.genesys.server.service.impl.RESTApiException;
import org.genesys.server.service.worker.AccessionOpResponse;
import org.genesys.server.service.worker.AccessionOpResponse.UpsertResult;
import org.genesys.server.service.worker.AccessionUploader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.transaction.TransactionException;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import io.swagger.annotations.Api;
/**
* Accession API v1: the uploader.
*/
@RestController("accessionUploadApi1")
@PreAuthorize("isAuthenticated()")
@RequestMapping(AccessionUploadController.CONTROLLER_URL)
@Api(tags = { "accession" })
public class AccessionUploadController {
/** The Constant LOG. */
private static final Logger LOG = LoggerFactory.getLogger(AccessionUploadController.class);
private final ObjectMapper mapper = new ObjectMapper();
/** The Constant CONTROLLER_URL. */
public static final String CONTROLLER_URL = ApiBaseController.APIv1_BASE + "/acn";
/** The Constant UPLOAD_RETRIES. */
private static final int UPLOAD_RETRIES = 2;
/** The uploader. */
@Autowired
private AccessionUploader uploader;
/** The institute service. */
@Autowired
InstituteService instituteService;
/**
* Update accessions in the system.
*
* @param instCode the WIEWS institute code
* @param updates the updates
* @return the list
* @throws Exception the exception
*/
@PreAuthorize("isAuthenticated()")
@RequestMapping(value = "/{instCode}/upsert", method = { RequestMethod.POST }, produces = { MediaType.APPLICATION_JSON_VALUE })
public List<AccessionOpResponse> upsertInstituteAccession(@PathVariable("instCode") String instCode, @RequestBody ArrayNode updates) throws Exception {
StopWatch stopWatch = StopWatch.createStarted();
LOG.trace("Received:\n {}", updates);
// User's permission to WRITE to this WIEWS institute are checked in
// BatchRESTService.
final FaoInstitute institute = instituteService.getInstitute(instCode);
if (institute == null) {
throw new NotFoundElement();
}
for (int tryCount = 0; tryCount < UPLOAD_RETRIES; tryCount++) {
try {
List<AccessionOpResponse> res = uploader.upsertAccessions(institute, updates);
LOG.info("Processed {} accessions for {} in {} tries in {}ms", updates.size(), instCode, tryCount + 1, stopWatch.getTime());
return res;
} catch (PleaseRetryException e) {
LOG.error("Retry {} of {} tries due to: {}", tryCount, UPLOAD_RETRIES, e.getMessage());
// Wait a bit
Thread.sleep((long) (RandomUtils.nextDouble(10, 100) * tryCount));
} catch (Throwable e) {
if (e instanceof ValidationException || e instanceof DataIntegrityViolationException) {
} else {
LOG.warn("Error inserting bulk: {}", e.getMessage(), e);
}
List<AccessionOpResponse> res = upsert1By1(institute, updates);
LOG.info("Processed {} accessions for {} 1by1 after {} tries in {}ms because of {}", updates.size(), instCode, tryCount + 1, stopWatch.getTime(), e.getMessage());
return res;
}
}
throw new InvalidApiUsageException("Just couldn't do it.");
}
/**
* Upsert 1 by 1.
*
* @param institute the institute
* @param updates the updates
* @return the list
* @throws Exception the exception
*/
private List<AccessionOpResponse> upsert1By1(FaoInstitute institute, ArrayNode updates) throws Exception {
List<AccessionOpResponse> response = new ArrayList<>();
ArrayNode single = mapper.createArrayNode();
for (JsonNode update : updates) {
single.removeAll();
single.add(update);
Throwable lastException = null;
for (int tryCount = 0; tryCount < UPLOAD_RETRIES; tryCount++) {
try {
response.addAll(uploader.upsertAccessions(institute, single));
lastException = null;
break;
} catch (ValidationException | InvalidApiUsageException e) {
lastException = e;
break; // don't retry
} catch (TransactionException | RollbackException | CannotAcquireLockException | DataIntegrityViolationException | PleaseRetryException e) {
lastException = e;
if (tryCount > 1) {
LOG.info("Retry {} of {} tries due to: {}", tryCount, UPLOAD_RETRIES, e.getMessage());
} else {
LOG.debug("Retrying {} of {} tries due to: {}", tryCount, UPLOAD_RETRIES, e.getMessage());
}
// Wait a bit
Thread.sleep((long) (RandomUtils.nextDouble(10, 100) * tryCount));
}
}
if (lastException != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Upsert failed due to: {} data={}", lastException.getMessage(), update, lastException);
} else {
LOG.info("Upsert failed due to: {} data={}", lastException.getMessage(), update);
}
// record error
response.add(new AccessionOpResponse(update)
.setResult(new UpsertResult(UpsertResult.Type.ERROR)).setError(getDetailedErrorMessage(lastException)));
}
}
return response;
}
/**
* Delete accessions by instituteCode, acceNumb and genus.
*
* @param instCode the inst code
* @param batch the batch
* @return the list
* @throws RESTApiException the REST api exception
*/
@PreAuthorize("isAuthenticated()")
@RequestMapping(value = "/{instCode}/delete", method = { RequestMethod.POST }, consumes = { MediaType.APPLICATION_JSON_VALUE }, produces = {
MediaType.APPLICATION_JSON_VALUE })
public @ResponseBody List<AccessionOpResponse> deleteAccessions(@PathVariable("instCode") String instCode, @RequestBody ArrayNode batch) throws RESTApiException {
// User's permission to WRITE to this WIEWS institute are checked in
// BatchRESTService.
final FaoInstitute institute = instituteService.getInstitute(instCode);
if (institute == null) {
throw new NotFoundElement();
}
try {
List<AccessionOpResponse> response = null;
try {
response = uploader.deleteAccessions(institute, batch);
LOG.info("Deleted {} accessions from {}", response.size(), instCode);
} catch (DataAccessException | TransactionException | PersistenceException e) {
LOG.info("Retrying delete one by one due to {}", e.getMessage());
response = deleteAccessions1by1(institute, batch);
}
return response;
} catch (CannotAcquireLockException e) {
throw new PleaseRetryException("Operation failed, please retry.", e);
}
}
/**
* Delete accessions 1 by 1.
*
* @param institute the institute
* @param batch the batch
* @return the list
*/
private List<AccessionOpResponse> deleteAccessions1by1(FaoInstitute institute, ArrayNode batch) {
LOG.info("Attempting delete 1 by 1");
final ArrayNode single = mapper.createArrayNode();
final List<AccessionOpResponse> response = new ArrayList<AccessionOpResponse>();
for (JsonNode accn : batch) {
try {
single.removeAll();
single.add(accn);
AccessionOpResponse accessionResponse = uploader.deleteAccessions(institute, single).get(0);
response.add(accessionResponse);
} catch (Throwable e) {
if (LOG.isInfoEnabled()) {
LOG.info("Error deleting {}: {}", accn.get("instituteCode"), e.getMessage());
}
AccessionOpResponse accessionResponse = new AccessionOpResponse(accn);
accessionResponse.setError(getDetailedErrorMessage(e));
response.add(accessionResponse);
}
}
return response;
}
/**
* Gets the detailed error message.
*
* @param e the exception
* @return the detailed error message
*/
private String getDetailedErrorMessage(Throwable e) {
StringBuffer sb=new StringBuffer(e.getMessage());
Throwable r = e;
while ((r = r.getCause()) !=null) {
sb.append("\n" + r.getMessage());
}
return sb.toString();
}
}