SGSVUpdate.java
/*
* Copyright 2016 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.genesys.server.service.worker;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.model.genesys.SvalbardDeposit;
import org.genesys.server.service.GenesysService;
import org.genesys.server.service.InstituteService;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
/**
* Download SGSV seed sample data
*/
@Component
public class SGSVUpdate {
public static final Logger LOG = LoggerFactory.getLogger(SGSVUpdate.class);
// Handle data in batches of 100
private static final int BATCH_SIZE = 100;
// About 5M is used for buffering the SGSV input stream
private static final int STREAM_BUFFER_SIZE = 5000000;
private static final String SGSV_DOWNLOAD_URL = "http://www.nordgen.org/sgsv/download.php?file=/scope/sgsv/files/sgsv_templates.tab";
static final String[] SGSV_HEADERS = { "sgsv_id", "institute_code", "deposit_box_number", "collection_name", "accession_number", "full_scientific_name",
"country_of_collection_or_source", "number_of_seeds", "regeneration_month_and_year", "other_accession_designations", "provider_institute_code", "accession_url",
"country_code", "country_name", "continent_name", "seeds", "genus", "species_epithet", "species", "taxon_name", "date_of_deposit", "date_of_dataset",
"sgsv_template_id", "box_id", "sgsv_taxon_id", "taxon_authority", "infraspesific_epithet", "vernacular_name", "itis_tsn", "sgsv_genus_id", "accession_name" };
private static final DateTimeFormatter SGSV_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd");
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private GenesysService genesysService;
@Autowired
private InstituteService instituteService;
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void updateSGSV() {
downloadSgsv();
}
// public void importFile() {
// File file = new File("~/Downloads/sgsv_templates_20160907.tab");
// try (InputStream fis = new FileInputStream(file)) {
// importSGSVStream(fis);
//
// } catch (FileNotFoundException e) {
// LOG.error(e);
// } catch (IOException e) {
// LOG.error(e);
// }
// }
public void downloadSgsv() {
LOG.warn("Importing SGSV data from {}", SGSV_DOWNLOAD_URL);
final HttpGet httpget = new HttpGet(SGSV_DOWNLOAD_URL);
HttpResponse response = null;
File tempFile = null;
final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
try {
tempFile = File.createTempFile("sgsv", ".csv");
response = httpclient.execute(httpget);
LOG.debug(response.getStatusLine().toString());
// Get hold of the response entity
final HttpEntity entity = response.getEntity();
for (final Header header : response.getAllHeaders()) {
LOG.debug(header.toString());
}
LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());
LOG.warn("Downloading SGSV data to {}", tempFile.getAbsolutePath());
StreamUtils.copy(entity.getContent(), new BufferedOutputStream(new FileOutputStream(tempFile)));
LOG.warn("Data download completed to {}", tempFile.getAbsolutePath());
IOUtils.closeQuietly(httpclient);
importSGSVStream(new FileInputStream(tempFile));
} catch (final Throwable e) {
LOG.error("SGSV import failed to complete.", e);
} finally {
IOUtils.closeQuietly(httpclient);
FileUtils.deleteQuietly(tempFile);
}
}
void importSGSVStream(final InputStream str) throws IOException {
LOG.warn("Starting import of SGSV data from stream");
int counter = 0;
CSVReader reader = null;
try {
var parser = new CSVParserBuilder()
.withSeparator('\t')
.withQuoteChar((char) 0)
.withEscapeChar('\\')
.withStrictQuotes(false)
.withIgnoreQuotations(false)
.withIgnoreLeadingWhiteSpace(true)
.build();
reader = new CSVReaderBuilder(new BufferedReader(new InputStreamReader(str), STREAM_BUFFER_SIZE)).withCSVParser(parser).build();
final String[] headers = reader.readNext();
if (!validHeaders(headers)) {
return;
}
final List<String[]> bulk = new ArrayList<>(BATCH_SIZE);
String[] line = null;
while ((line = reader.readNext()) != null) {
if (counter % 1000 == 0) {
LOG.info("SGSV CSV line={}: {}", counter, ArrayUtils.toString(line));
}
// Clean up
for (int i = 0; i < line.length; i++) {
line[i] = line[i].trim();
if (line[i].equals("null") || line[i].equals("<null>") || StringUtils.isBlank(line[i])) {
line[i] = null;
}
}
if (line.length > SGSV_HEADERS.length) {
LOG.warn("Funny line length {}", Arrays.toString(line));
}
bulk.add(line);
counter++;
if (counter % BATCH_SIZE == 0) {
workIt(bulk);
bulk.clear();
}
}
workIt(bulk);
bulk.clear();
} catch (final Throwable e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
} finally {
IOUtils.closeQuietly(reader);
}
LOG.warn("Done importing SGSV data. Imported rows={}", counter);
}
boolean validHeaders(String[] headers) {
if (headers == null) {
LOG.warn("null headers received");
return false;
}
LOG.debug("Headers: {}", headers.length);
if (headers.length != SGSV_HEADERS.length) {
LOG.warn("Expected 30 headers, got {}", headers.length);
return false;
}
for (int i = 0; i < SGSV_HEADERS.length; i++) {
if (!StringUtils.equals(headers[i], SGSV_HEADERS[i])) {
LOG.warn("SGSV template header mismatch pos={} expected={} found={}", i, SGSV_HEADERS[i], headers[i]);
return false;
}
}
return true;
}
void workIt(final List<String[]> bulk) {
// Need copy!
List<String[]> copy = new ArrayList<>(bulk);
bulk.clear();
if (LOG.isTraceEnabled())
LOG.trace("Queueing job size={}", copy.size());
taskExecutor.execute(new Runnable() {
@Override
public void run() {
List<SvalbardDeposit> svalbardDeposits = readSvalbardDeposits(copy);
if (LOG.isTraceEnabled())
LOG.trace("Got SGVS entries size={}", svalbardDeposits.size());
if (CollectionUtils.isEmpty(svalbardDeposits)) {
return;
}
for (int retry = 5; retry > 0; retry--) {
if (retry < 5) {
LOG.info("Trying to save data in attempt #{}", (5 - retry));
}
try {
genesysService.saveSvalbards(svalbardDeposits);
if (retry < 5) {
LOG.info("Persistence attempt #{} successful", (5 - retry));
}
// EXIT
return;
} catch (final CannotAcquireLockException | ObjectOptimisticLockingFailureException e) {
LOG.info("Failed to save data, will retry. {}", e.getMessage());
try {
Thread.sleep((long) (((retry * 200) + 50) * Math.random()));
} catch (InterruptedException e1) {
}
continue;
} catch (final Throwable e) {
LOG.error("Failed to save data", e);
}
}
LOG.error("All persistence retries failed!");
}
});
}
/**
* Extract data from CSV line into SvalbardDeposit instance
*/
private SvalbardDeposit readSvalbardDeposit(String[] seedSamplesRow) {
SvalbardDeposit deposit = new SvalbardDeposit();
deposit.setId(Long.parseLong(seedSamplesRow[0]));
deposit.setInstitute(instituteService.getInstitute(seedSamplesRow[1]));
deposit.setBoxNo(seedSamplesRow[2]);
deposit.setAcceNumb(seedSamplesRow[4]);
if (seedSamplesRow[7] != null) {
deposit.setQuantity(Float.parseFloat(seedSamplesRow[7]));
}
deposit.setRegenerationInformation(seedSamplesRow[8]);
deposit.setProvider(instituteService.getInstitute(seedSamplesRow[10]));
deposit.setGenus(seedSamplesRow[16]);
try {
deposit.setDepositDate(LocalDate.ofInstant(Instant.ofEpochMilli(SGSV_DATE_FORMAT.parseDateTime(seedSamplesRow[20]).getMillis()), ZoneOffset.UTC));
} catch (IllegalArgumentException e) {
LOG.warn("Illegal date format {} for sgsv_id={}", seedSamplesRow[20], deposit.getId());
}
return deposit;
}
public List<SvalbardDeposit> readSvalbardDeposits(List<String[]> copy) {
List<SvalbardDeposit> svalbardDeposits = copy.stream().map(row -> {
try {
return readSvalbardDeposit(row);
} catch (final Throwable e) {
LOG.error("Could not read SvalbardDeposit from {}", Arrays.toString(row));
return null;
}
}).filter(entry -> entry != null && entry.getAcceNumb() != null).collect(Collectors.toList());
return svalbardDeposits;
}
}