InstituteUpdater.java
/**
* Copyright 2014 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.genesys.server.service.worker;
import static org.genesys.util.NumberUtils.parseDoubleIgnore0;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.model.impl.FaoInstitute;
import org.genesys.server.service.GeoService;
import org.genesys.server.service.InstituteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;
@Component
public class InstituteUpdater {
public static final String WIEWS_EXPORT_URL = "https://data.apps.fao.org/static/downloads/wiews_instab/export_c.zip";
public static final String[] WIEWS_EXPORT_C_HEADERS = { "INSTCODE", "ACRONYM", "ECPACRONYM", "FULL_NAME", "TYPE", "GENEBANK_LONG_TERM_COLLECTIONS",
"BOTANICAL_GARDEN", "GENEBANK_MEDIUM_TERM_COLLECTIONS", "GENEBANK_SHORT_TERM_COLLECTIONS", "STREET_POB", "CITY_STATE", "ZIP_CODE", "PHONE", "FAX",
"EMAIL", "URL", "LATITUDE", "LONGITUDE", "ALTITUDE", "UPDATED_ON", "V_INSTCODE", "ISO3" };
public final static int COL_INSTCODE = 0;
public final static int COL_ACRONYM = 1;
public final static int COL_ECPACRONYM = 2;
public final static int COL_FULL_NAME = 3;
public final static int COL_TYPE = 4;
public final static int COL_GENEBANK_LONG_TERM_COLLECTIONS = 5;
public final static int COL_BOTANICAL_GARDEN = 6;
public final static int COL_GENEBANK_MEDIUM_TERM_COLLECTIONS = 7;
public final static int COL_GENEBANK_SHORT_TERM_COLLECTIONS = 8;
public final static int COL_STREET_POB = 9;
public final static int COL_CITY_STATE = 10;
public final static int COL_ZIP_CODE = 11;
public final static int COL_PHONE = 12;
public final static int COL_FAX = 13;
public final static int COL_EMAIL = 14;
public final static int COL_URL = 15;
public final static int COL_LATITUDE = 16;
public final static int COL_LONGITUDE = 17;
public final static int COL_ALTITUDE = 18;
public final static int COL_UPDATED_ON = 19;
public final static int COL_V_INSTCODE = 20;
public final static int COL_ISO3 = 21;
public static final Logger LOG = LoggerFactory.getLogger(InstituteUpdater.class);
@Autowired
private InstituteService instituteService;
@Autowired
private GeoService geoService;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private AccessionCounter accessionCounter;
private static final int BATCH_SIZE = 50;
/**
* Update local FaoInstitute with data from WIEWS database
*
* @throws IOException
*/
public void updateFaoInstitutes() throws IOException {
final HttpGet httpget = new HttpGet(WIEWS_EXPORT_URL);
HttpResponse response = null;
final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
try {
response = httpclient.execute(httpget);
LOG.debug(response.getStatusLine().toString());
// Get hold of the response entity
final HttpEntity entity = response.getEntity();
if (entity == null) {
LOG.warn("No HttpEntity in response, bailing out");
return;
}
LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());
// If the response does not enclose an entity, there is no
// need
// to bother about connection release
if (entity != null) {
ZipInputStream instream = null;
CSVReader reader = null;
try {
instream = new ZipInputStream(entity.getContent());
final ZipEntry zipEntry = instream.getNextEntry();
LOG.debug("Got entry: {}", zipEntry.getName());
if (!zipEntry.getName().equals("export_c.txt")) {
LOG.warn("Expected export_c, not {}", zipEntry.getName());
throw new IOException("Missing export_c");
}
final InputStreamReader inreader = new InputStreamReader(instream, "UTF-8");
var parser = new CSVParserBuilder()
.withSeparator(',')
.withQuoteChar('"')
.withStrictQuotes(false)
.build();
reader = new CSVReaderBuilder(inreader).withCSVParser(parser).build();
// Ensure headers match known format
final String[] headers = reader.readNext();
LOG.warn("export_c.txt headers: {}", ArrayUtils.toString(headers, "<null>"));
if (WIEWS_EXPORT_C_HEADERS.length != headers.length || !Arrays.equals(WIEWS_EXPORT_C_HEADERS, headers)) {
throw new InstituteUpdateException("export_c.txt headers mismatch: " + ArrayUtils.toString(headers, "<null>"));
}
final Map<String, String[]> batch = new HashMap<String, String[]>(BATCH_SIZE);
String[] line = null;
while ((line = reader.readNext()) != null) {
for (int i = 0; i < line.length; i++) {
if (line[i].equals("null") || StringUtils.isBlank(line[i])) {
line[i] = null;
}
}
// if (StringUtils.isNotBlank(line[14])) {
// LOG.info(ArrayUtils.toString(line));
// }
final String instCode = line[COL_INSTCODE];
batch.put(instCode, line);
if (batch.size() == BATCH_SIZE) {
workIt(batch);
batch.clear();
}
}
workIt(batch);
batch.clear();
reader.close();
} catch (final RuntimeException ex) {
LOG.error(ex.getMessage(), ex);
httpget.abort();
} catch (InstituteUpdateException | CsvValidationException e) {
throw new IOException(e);
} finally {
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(instream);
}
}
LOG.info("Done importing WIEWS database");
} finally {
IOUtils.closeQuietly(httpclient);
}
}
private void workIt(final Map<String, String[]> batch) {
// Need copy!
final Map<String, String[]> batchCopy = new HashMap<String, String[]>(batch);
taskExecutor.execute(new Runnable() {
@Override
public void run() {
LOG.info("Processing next batch");
// All instCodes in the batch
final Collection<String> instCodes = batchCopy.keySet();
// Fetch FaoInstitutes from DB
final List<FaoInstitute> existing = instituteService.getInstitutes(instCodes);
final List<FaoInstitute> toSave = new ArrayList<FaoInstitute>(instCodes.size());
for (final FaoInstitute faoInstitute : existing) {
final String[] line = batchCopy.get(faoInstitute.getCode());
// Remove it
batchCopy.remove(faoInstitute.getCode());
if (updateData(faoInstitute, line)) {
toSave.add(faoInstitute);
}
}
for (final var entry : batchCopy.entrySet()) {
LOG.info("Adding FaoInstitute with code: {}", entry.getKey());
toSave.add(insertData(entry.getValue()));
}
// Save what needs saving
if (toSave.size() > 0) {
LOG.info("Updating FaoInstitutes: {}", toSave.size());
instituteService.update(toSave);
toSave.forEach(accessionCounter::recountInstitute);
}
}
private FaoInstitute insertData(String[] line) {
final FaoInstitute faoInstitute = new FaoInstitute();
updateData(faoInstitute, line);
return faoInstitute;
}
private boolean updateData(FaoInstitute faoInstitute, String[] line) {
final String instCode = line[COL_INSTCODE];
final String acronym = line[COL_ACRONYM];
// final String ecpaAcronym = line[COL_ECPACRONYM];
final String fullName = line[COL_FULL_NAME];
final String type = line[COL_TYPE];
final boolean maintColl = "1".equals(line[COL_GENEBANK_LONG_TERM_COLLECTIONS]) || "1".equals(line[COL_GENEBANK_MEDIUM_TERM_COLLECTIONS])
|| "1".equals(line[COL_GENEBANK_SHORT_TERM_COLLECTIONS]);
final boolean pgrActivity = maintColl || "1".equals(line[COL_BOTANICAL_GARDEN]);
// final String streetPob = line[COL_STREET_POB];
// final String cityState = line[COL_CITY_STATE];
// final String zipCode = line[COL_ZIP_CODE];
// final String phone = line[COL_PHONE];
// final String fax = line[COL_FAX];
final String email = line[COL_EMAIL];
String url = line[COL_URL];
final String latitude = line[COL_LATITUDE];
final String longitude = line[COL_LONGITUDE];
final String elevation = line[COL_ALTITUDE];
// final String updatedOn = line[COL_UPDATED_ON];
final String vInstCode = line[COL_V_INSTCODE];
final String isoCountry = line[COL_ISO3];
if (faoInstitute.getCode() == null) {
faoInstitute.setCode(instCode);
}
faoInstitute.setAcronym(acronym);
faoInstitute.setFullName(fullName);
if (LOG.isTraceEnabled()) {
LOG.trace("Updating: code={} fullname='{}' acronym={}", instCode, fullName, acronym);
}
faoInstitute.setPgrActivity(pgrActivity);
faoInstitute.setMaintainsCollection(maintColl);
faoInstitute.setEmail(email);
faoInstitute.setType(type);
if (url != null && url.length() > 299) {
LOG.warn("Trimming URL of {} to 300 characters: {}", instCode, url);
}
faoInstitute.setUrl(StringUtils.abbreviate(url, 300));
final Double lat = parseDoubleIgnore0(latitude, 1);
faoInstitute.setLatitude(lat);
final Double lon = parseDoubleIgnore0(longitude, 1);
faoInstitute.setLongitude(lon);
final Double elev = parseDoubleIgnore0(elevation, 1);
faoInstitute.setElevation(elev);
faoInstitute.setVCode(vInstCode);
faoInstitute.setCurrent(vInstCode == null || StringUtils.equals(instCode, vInstCode));
// Update institute country if null or when not matching the
// code
faoInstitute.setCountry(geoService.getCurrentCountry(isoCountry));
return true;
}
});
}
public static class InstituteUpdateException extends Exception {
private static final long serialVersionUID = 1L;
public InstituteUpdateException(String message) {
super(message);
}
}
}