AccessionUploadController.java

/*
 * Copyright 2018 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.genesys.server.api.v1;

import java.util.ArrayList;
import java.util.List;

import javax.persistence.PersistenceException;
import javax.persistence.RollbackException;
import javax.validation.ValidationException;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.server.api.ApiBaseController;
import org.genesys.server.api.PleaseRetryException;
import org.genesys.server.exception.InvalidApiUsageException;
import org.genesys.server.exception.NotFoundElement;
import org.genesys.server.model.impl.FaoInstitute;
import org.genesys.server.service.InstituteService;
import org.genesys.server.service.impl.RESTApiException;
import org.genesys.server.service.worker.AccessionOpResponse;
import org.genesys.server.service.worker.AccessionOpResponse.UpsertResult;
import org.genesys.server.service.worker.AccessionUploader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.transaction.TransactionException;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;

import io.swagger.annotations.Api;

/**
 * Accession API v1: the uploader.
 */
@RestController("accessionUploadApi1")
@PreAuthorize("isAuthenticated()")
@RequestMapping(AccessionUploadController.CONTROLLER_URL)
@Api(tags = { "accession" })
public class AccessionUploadController {

	/** The Constant LOG. */
	private static final Logger LOG = LoggerFactory.getLogger(AccessionUploadController.class);

	private final ObjectMapper mapper = new ObjectMapper();

	/** The Constant CONTROLLER_URL. */
	public static final String CONTROLLER_URL = ApiBaseController.APIv1_BASE + "/acn";

	/** The Constant UPLOAD_RETRIES. */
	private static final int UPLOAD_RETRIES = 2;

	/** The uploader. */
	@Autowired
	private AccessionUploader uploader;

	/** The institute service. */
	@Autowired
	InstituteService instituteService;


	/**
	 * Update accessions in the system.
	 *
	 * @param instCode the WIEWS institute code
	 * @param updates the updates
	 * @return the list
	 * @throws Exception the exception
	 */
	@PreAuthorize("isAuthenticated()")
	@RequestMapping(value = "/{instCode}/upsert", method = { RequestMethod.POST }, produces = { MediaType.APPLICATION_JSON_VALUE })
	public List<AccessionOpResponse> upsertInstituteAccession(@PathVariable("instCode") String instCode, @RequestBody ArrayNode updates) throws Exception {
		
		StopWatch stopWatch = StopWatch.createStarted();

		LOG.trace("Received:\n {}", updates);

		// User's permission to WRITE to this WIEWS institute are checked in
		// BatchRESTService.
		final FaoInstitute institute = instituteService.getInstitute(instCode);
		if (institute == null) {
			throw new NotFoundElement();
		}

		for (int tryCount = 0; tryCount < UPLOAD_RETRIES; tryCount++) {
			try {
				List<AccessionOpResponse> res = uploader.upsertAccessions(institute, updates);
				LOG.info("Processed {} accessions for {} in {} tries in {}ms", updates.size(), instCode, tryCount + 1, stopWatch.getTime());
				return res;
			} catch (PleaseRetryException e) {
				LOG.error("Retry {} of {} tries due to: {}", tryCount, UPLOAD_RETRIES, e.getMessage());
				// Wait a bit
				Thread.sleep((long) (RandomUtils.nextDouble(10, 100) * tryCount));
			} catch (Throwable e) {
				if (e instanceof ValidationException || e instanceof DataIntegrityViolationException) {

				} else {
					LOG.warn("Error inserting bulk: {}", e.getMessage(), e);
				}
				List<AccessionOpResponse> res = upsert1By1(institute, updates);
				LOG.info("Processed {} accessions for {} 1by1 after {} tries in {}ms because of {}", updates.size(), instCode, tryCount + 1, stopWatch.getTime(), e.getMessage());
				return res;
			}
		}
		throw new InvalidApiUsageException("Just couldn't do it.");
	}

	/**
	 * Upsert 1 by 1.
	 *
	 * @param institute the institute
	 * @param updates the updates
	 * @return the list
	 * @throws Exception the exception
	 */
	private List<AccessionOpResponse> upsert1By1(FaoInstitute institute, ArrayNode updates) throws Exception {
		List<AccessionOpResponse> response = new ArrayList<>();
		ArrayNode single = mapper.createArrayNode();
		for (JsonNode update : updates) {
			single.removeAll();
			single.add(update);
			Throwable lastException = null;
			for (int tryCount = 0; tryCount < UPLOAD_RETRIES; tryCount++) {
				try {
					response.addAll(uploader.upsertAccessions(institute, single));
					lastException = null;
					break;
				} catch (ValidationException | InvalidApiUsageException e) {
					lastException = e;
					break; // don't retry
				} catch (TransactionException | RollbackException | CannotAcquireLockException | DataIntegrityViolationException | PleaseRetryException e) {
					lastException = e;
					if (tryCount > 1) {
						LOG.info("Retry {} of {} tries due to: {}", tryCount, UPLOAD_RETRIES, e.getMessage());
					} else {
						LOG.debug("Retrying {} of {} tries due to: {}", tryCount, UPLOAD_RETRIES, e.getMessage());
					}
					// Wait a bit
					Thread.sleep((long) (RandomUtils.nextDouble(10, 100) * tryCount));
				}
			}
			if (lastException != null) {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Upsert failed due to: {} data={}", lastException.getMessage(), update, lastException);
				} else {
					LOG.info("Upsert failed due to: {} data={}", lastException.getMessage(), update);
				}
				// record error
				response.add(new AccessionOpResponse(update)
					.setResult(new UpsertResult(UpsertResult.Type.ERROR)).setError(getDetailedErrorMessage(lastException)));
			}
		}
		return response;
	}
	

	/**
	 * Delete accessions by instituteCode, acceNumb and genus.
	 *
	 * @param instCode the inst code
	 * @param batch the batch
	 * @return the list
	 * @throws RESTApiException the REST api exception
	 */
	@PreAuthorize("isAuthenticated()")
	@RequestMapping(value = "/{instCode}/delete", method = { RequestMethod.POST }, consumes = { MediaType.APPLICATION_JSON_VALUE }, produces = {
			MediaType.APPLICATION_JSON_VALUE })
	public @ResponseBody List<AccessionOpResponse> deleteAccessions(@PathVariable("instCode") String instCode, @RequestBody ArrayNode batch) throws RESTApiException {
		// User's permission to WRITE to this WIEWS institute are checked in
		// BatchRESTService.
		final FaoInstitute institute = instituteService.getInstitute(instCode);
		if (institute == null) {
			throw new NotFoundElement();
		}

		try {
			List<AccessionOpResponse> response = null;
			try {
				response = uploader.deleteAccessions(institute, batch);
				LOG.info("Deleted {} accessions from {}", response.size(), instCode);
			} catch (DataAccessException | TransactionException | PersistenceException e) {
				LOG.info("Retrying delete one by one due to {}", e.getMessage());
				response = deleteAccessions1by1(institute, batch);
			}
			return response;

		} catch (CannotAcquireLockException e) {
			throw new PleaseRetryException("Operation failed, please retry.", e);
		}
	}

	/**
	 * Delete accessions 1 by 1.
	 *
	 * @param institute the institute
	 * @param batch the batch
	 * @return the list
	 */
	private List<AccessionOpResponse> deleteAccessions1by1(FaoInstitute institute, ArrayNode batch) {
		LOG.info("Attempting delete 1 by 1");

		final ArrayNode single = mapper.createArrayNode();
		final List<AccessionOpResponse> response = new ArrayList<AccessionOpResponse>();

		for (JsonNode accn : batch) {
			try {
				single.removeAll();
				single.add(accn);
				AccessionOpResponse accessionResponse = uploader.deleteAccessions(institute, single).get(0);
				response.add(accessionResponse);
			} catch (Throwable e) {
				if (LOG.isInfoEnabled()) {
					LOG.info("Error deleting {}: {}", accn.get("instituteCode"), e.getMessage());
				}

				AccessionOpResponse accessionResponse = new AccessionOpResponse(accn);
				accessionResponse.setError(getDetailedErrorMessage(e));
				response.add(accessionResponse);
			}
		}
		return response;
	}
	

	/**
	 * Gets the detailed error message.
	 *
	 * @param e the exception
	 * @return the detailed error message
	 */
	private String getDetailedErrorMessage(Throwable e) {
		StringBuffer sb=new StringBuffer(e.getMessage());
		Throwable r = e;
		while ((r = r.getCause()) !=null) {
			sb.append("\n" + r.getMessage());
		}	
		return sb.toString();
	}
}