SGSVUpdate.java

/*
 * Copyright 2016 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 **/

package org.genesys.server.service.worker;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.model.genesys.SvalbardDeposit;
import org.genesys.server.service.GenesysService;
import org.genesys.server.service.InstituteService;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;

import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;

/**
 * Download SGSV seed sample data
 */
@Component
public class SGSVUpdate {
	public static final Logger LOG = LoggerFactory.getLogger(SGSVUpdate.class);

	// Handle data in batches of 100
	private static final int BATCH_SIZE = 100;

	// About 5M is used for buffering the SGSV input stream
	private static final int STREAM_BUFFER_SIZE = 5000000;

	private static final String SGSV_DOWNLOAD_URL = "http://www.nordgen.org/sgsv/download.php?file=/scope/sgsv/files/sgsv_templates.tab";

	static final String[] SGSV_HEADERS = { "sgsv_id", "institute_code", "deposit_box_number", "collection_name", "accession_number", "full_scientific_name",
			"country_of_collection_or_source", "number_of_seeds", "regeneration_month_and_year", "other_accession_designations", "provider_institute_code", "accession_url",
			"country_code", "country_name", "continent_name", "seeds", "genus", "species_epithet", "species", "taxon_name", "date_of_deposit", "date_of_dataset",
			"sgsv_template_id", "box_id", "sgsv_taxon_id", "taxon_authority", "infraspesific_epithet", "vernacular_name", "itis_tsn", "sgsv_genus_id", "accession_name" };

	private static final DateTimeFormatter SGSV_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd");

	@Autowired
	private TaskExecutor taskExecutor;

	@Autowired
	private GenesysService genesysService;

	@Autowired
	private InstituteService instituteService;

	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void updateSGSV() {
		downloadSgsv();
	}

	// public void importFile() {
	// File file = new File("~/Downloads/sgsv_templates_20160907.tab");
	// try (InputStream fis = new FileInputStream(file)) {
	// importSGSVStream(fis);
	//
	// } catch (FileNotFoundException e) {
	// LOG.error(e);
	// } catch (IOException e) {
	// LOG.error(e);
	// }
	// }

	public void downloadSgsv() {
		LOG.warn("Importing SGSV data from {}", SGSV_DOWNLOAD_URL);

		final HttpGet httpget = new HttpGet(SGSV_DOWNLOAD_URL);
		HttpResponse response = null;
		File tempFile = null;
		final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
		try {
			tempFile = File.createTempFile("sgsv", ".csv");
			response = httpclient.execute(httpget);

			LOG.debug(response.getStatusLine().toString());

			// Get hold of the response entity
			final HttpEntity entity = response.getEntity();

			for (final Header header : response.getAllHeaders()) {
				LOG.debug(header.toString());
			}

			LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());
			LOG.warn("Downloading SGSV data to {}", tempFile.getAbsolutePath());
			StreamUtils.copy(entity.getContent(), new BufferedOutputStream(new FileOutputStream(tempFile)));
			LOG.warn("Data download completed to {}", tempFile.getAbsolutePath());

			IOUtils.closeQuietly(httpclient);
			importSGSVStream(new FileInputStream(tempFile));
		} catch (final Throwable e) {
			LOG.error("SGSV import failed to complete.", e);
		} finally {
			IOUtils.closeQuietly(httpclient);
			FileUtils.deleteQuietly(tempFile);
		}
	}

	void importSGSVStream(final InputStream str) throws IOException {
		LOG.warn("Starting import of SGSV data from stream");

		int counter = 0;
		CSVReader reader = null;
		try {
			var parser = new CSVParserBuilder()
				.withSeparator('\t')
				.withQuoteChar((char) 0)
				.withEscapeChar('\\')
				.withStrictQuotes(false)
				.withIgnoreQuotations(false)
				.withIgnoreLeadingWhiteSpace(true)
				.build();

			reader = new CSVReaderBuilder(new BufferedReader(new InputStreamReader(str), STREAM_BUFFER_SIZE)).withCSVParser(parser).build();

			final String[] headers = reader.readNext();
			if (!validHeaders(headers)) {
				return;
			}

			final List<String[]> bulk = new ArrayList<>(BATCH_SIZE);

			String[] line = null;
			while ((line = reader.readNext()) != null) {
				if (counter % 1000 == 0) {
					LOG.info("SGSV CSV line={}: {}", counter, ArrayUtils.toString(line));
				}

				// Clean up
				for (int i = 0; i < line.length; i++) {
					line[i] = line[i].trim();
					if (line[i].equals("null") || line[i].equals("<null>") || StringUtils.isBlank(line[i])) {
						line[i] = null;
					}
				}

				if (line.length > SGSV_HEADERS.length) {
					LOG.warn("Funny line length {}", Arrays.toString(line));
				}

				bulk.add(line);

				counter++;

				if (counter % BATCH_SIZE == 0) {
					workIt(bulk);
					bulk.clear();
				}
			}

			workIt(bulk);
			bulk.clear();

		} catch (final Throwable e) {
			LOG.error(e.getMessage(), e);
			throw new IOException(e);
		} finally {
			IOUtils.closeQuietly(reader);
		}

		LOG.warn("Done importing SGSV data. Imported rows={}", counter);
	}

	boolean validHeaders(String[] headers) {
		if (headers == null) {
			LOG.warn("null headers received");
			return false;
		}
		LOG.debug("Headers: {}", headers.length);
		if (headers.length != SGSV_HEADERS.length) {
			LOG.warn("Expected 30 headers, got {}", headers.length);
			return false;
		}
		for (int i = 0; i < SGSV_HEADERS.length; i++) {
			if (!StringUtils.equals(headers[i], SGSV_HEADERS[i])) {
				LOG.warn("SGSV template header mismatch pos={} expected={} found={}", i, SGSV_HEADERS[i], headers[i]);
				return false;
			}
		}
		return true;
	}

	void workIt(final List<String[]> bulk) {
		// Need copy!
		List<String[]> copy = new ArrayList<>(bulk);
		bulk.clear();

		if (LOG.isTraceEnabled())
			LOG.trace("Queueing job size={}", copy.size());

		taskExecutor.execute(new Runnable() {

			@Override
			public void run() {
				List<SvalbardDeposit> svalbardDeposits = readSvalbardDeposits(copy);

				if (LOG.isTraceEnabled())
					LOG.trace("Got SGVS entries size={}", svalbardDeposits.size());

				if (CollectionUtils.isEmpty(svalbardDeposits)) {
					return;
				}

				for (int retry = 5; retry > 0; retry--) {
					if (retry < 5) {
						LOG.info("Trying to save data in attempt #{}", (5 - retry));
					}

					try {
						genesysService.saveSvalbards(svalbardDeposits);
						if (retry < 5) {
							LOG.info("Persistence attempt #{} successful", (5 - retry));
						}
						// EXIT
						return;

					} catch (final CannotAcquireLockException | ObjectOptimisticLockingFailureException e) {
						LOG.info("Failed to save data, will retry. {}", e.getMessage());
						try {
							Thread.sleep((long) (((retry * 200) + 50) * Math.random()));
						} catch (InterruptedException e1) {
						}
						continue;
					} catch (final Throwable e) {
						LOG.error("Failed to save data", e);
					}
				}

				LOG.error("All persistence retries failed!");
			}
		});
	}

	/**
	 * Extract data from CSV line into SvalbardDeposit instance
	 */
	private SvalbardDeposit readSvalbardDeposit(String[] seedSamplesRow) {
		SvalbardDeposit deposit = new SvalbardDeposit();
		deposit.setId(Long.parseLong(seedSamplesRow[0]));
		deposit.setInstitute(instituteService.getInstitute(seedSamplesRow[1]));
		deposit.setBoxNo(seedSamplesRow[2]);
		deposit.setAcceNumb(seedSamplesRow[4]);
		if (seedSamplesRow[7] != null) {
			deposit.setQuantity(Float.parseFloat(seedSamplesRow[7]));
		}
		deposit.setRegenerationInformation(seedSamplesRow[8]);
		deposit.setProvider(instituteService.getInstitute(seedSamplesRow[10]));
		deposit.setGenus(seedSamplesRow[16]);
		try {
			deposit.setDepositDate(LocalDate.ofInstant(Instant.ofEpochMilli(SGSV_DATE_FORMAT.parseDateTime(seedSamplesRow[20]).getMillis()), ZoneOffset.UTC));
		} catch (IllegalArgumentException e) {
			LOG.warn("Illegal date format {} for sgsv_id={}", seedSamplesRow[20], deposit.getId());
		}
		return deposit;
	}

	public List<SvalbardDeposit> readSvalbardDeposits(List<String[]> copy) {
		List<SvalbardDeposit> svalbardDeposits = copy.stream().map(row -> {
			try {
				return readSvalbardDeposit(row);
			} catch (final Throwable e) {
				LOG.error("Could not read SvalbardDeposit from {}", Arrays.toString(row));
				return null;
			}
		}).filter(entry -> entry != null && entry.getAcceNumb() != null).collect(Collectors.toList());
		return svalbardDeposits;
	}
}