InstituteUpdater.java

/**
 * Copyright 2014 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 **/

package org.genesys.server.service.worker;

import static org.genesys.util.NumberUtils.parseDoubleIgnore0;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.model.impl.FaoInstitute;
import org.genesys.server.service.GeoService;
import org.genesys.server.service.InstituteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;

import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;

@Component
public class InstituteUpdater {
	public static final String WIEWS_EXPORT_URL = "https://data.apps.fao.org/static/downloads/wiews_instab/export_c.zip";

	public static final String[] WIEWS_EXPORT_C_HEADERS = { "INSTCODE", "ACRONYM", "ECPACRONYM", "FULL_NAME", "TYPE", "GENEBANK_LONG_TERM_COLLECTIONS",
			"BOTANICAL_GARDEN", "GENEBANK_MEDIUM_TERM_COLLECTIONS", "GENEBANK_SHORT_TERM_COLLECTIONS", "STREET_POB", "CITY_STATE", "ZIP_CODE", "PHONE", "FAX",
			"EMAIL", "URL", "LATITUDE", "LONGITUDE", "ALTITUDE", "UPDATED_ON", "V_INSTCODE", "ISO3" };

	public final static int COL_INSTCODE = 0;
	public final static int COL_ACRONYM = 1;
	public final static int COL_ECPACRONYM = 2;
	public final static int COL_FULL_NAME = 3;
	public final static int COL_TYPE = 4;
	public final static int COL_GENEBANK_LONG_TERM_COLLECTIONS = 5;
	public final static int COL_BOTANICAL_GARDEN = 6;
	public final static int COL_GENEBANK_MEDIUM_TERM_COLLECTIONS = 7;
	public final static int COL_GENEBANK_SHORT_TERM_COLLECTIONS = 8;
	public final static int COL_STREET_POB = 9;
	public final static int COL_CITY_STATE = 10;
	public final static int COL_ZIP_CODE = 11;
	public final static int COL_PHONE = 12;
	public final static int COL_FAX = 13;
	public final static int COL_EMAIL = 14;
	public final static int COL_URL = 15;
	public final static int COL_LATITUDE = 16;
	public final static int COL_LONGITUDE = 17;
	public final static int COL_ALTITUDE = 18;
	public final static int COL_UPDATED_ON = 19;
	public final static int COL_V_INSTCODE = 20;
	public final static int COL_ISO3 = 21;

	public static final Logger LOG = LoggerFactory.getLogger(InstituteUpdater.class);

	@Autowired
	private InstituteService instituteService;

	@Autowired
	private GeoService geoService;

	@Autowired
	private TaskExecutor taskExecutor;

	@Autowired
	private AccessionCounter accessionCounter;

	private static final int BATCH_SIZE = 50;

	/**
	 * Update local FaoInstitute with data from WIEWS database
	 *
	 * @throws IOException
	 */
	public void updateFaoInstitutes() throws IOException {

		final HttpGet httpget = new HttpGet(WIEWS_EXPORT_URL);

		HttpResponse response = null;

		final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
		try {
			response = httpclient.execute(httpget);

			LOG.debug(response.getStatusLine().toString());

			// Get hold of the response entity
			final HttpEntity entity = response.getEntity();
			if (entity == null) {
				LOG.warn("No HttpEntity in response, bailing out");
				return;
			}
			LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());

			// If the response does not enclose an entity, there is no
			// need
			// to bother about connection release
			if (entity != null) {
				ZipInputStream instream = null;
				CSVReader reader = null;

				try {
					instream = new ZipInputStream(entity.getContent());
					final ZipEntry zipEntry = instream.getNextEntry();
					LOG.debug("Got entry: {}", zipEntry.getName());
					if (!zipEntry.getName().equals("export_c.txt")) {
						LOG.warn("Expected export_c, not {}", zipEntry.getName());
						throw new IOException("Missing export_c");
					}

					final InputStreamReader inreader = new InputStreamReader(instream, "UTF-8");
					var parser = new CSVParserBuilder()
						.withSeparator(',')
						.withQuoteChar('"')
						.withStrictQuotes(false)
						.build();
					
					reader = new CSVReaderBuilder(inreader).withCSVParser(parser).build();

					// Ensure headers match known format
					final String[] headers = reader.readNext();
					LOG.warn("export_c.txt headers: {}", ArrayUtils.toString(headers, "<null>"));

					if (WIEWS_EXPORT_C_HEADERS.length != headers.length || !Arrays.equals(WIEWS_EXPORT_C_HEADERS, headers)) {
						throw new InstituteUpdateException("export_c.txt headers mismatch: " + ArrayUtils.toString(headers, "<null>"));
					}

					final Map<String, String[]> batch = new HashMap<String, String[]>(BATCH_SIZE);

					String[] line = null;
					while ((line = reader.readNext()) != null) {
						for (int i = 0; i < line.length; i++) {
							if (line[i].equals("null") || StringUtils.isBlank(line[i])) {
								line[i] = null;
							}
						}
						// if (StringUtils.isNotBlank(line[14])) {
						// LOG.info(ArrayUtils.toString(line));
						// }

						final String instCode = line[COL_INSTCODE];
						batch.put(instCode, line);

						if (batch.size() == BATCH_SIZE) {
							workIt(batch);
							batch.clear();
						}

					}
					workIt(batch);
					batch.clear();

					reader.close();

				} catch (final RuntimeException ex) {
					LOG.error(ex.getMessage(), ex);
					httpget.abort();
				} catch (InstituteUpdateException | CsvValidationException e) {
					throw new IOException(e);

				} finally {
					IOUtils.closeQuietly(reader);
					IOUtils.closeQuietly(instream);
				}
			}

			LOG.info("Done importing WIEWS database");
		} finally {
			IOUtils.closeQuietly(httpclient);
		}
	}

	private void workIt(final Map<String, String[]> batch) {

		// Need copy!
		final Map<String, String[]> batchCopy = new HashMap<String, String[]>(batch);

		taskExecutor.execute(new Runnable() {
			@Override
			public void run() {
				LOG.info("Processing next batch");

				// All instCodes in the batch
				final Collection<String> instCodes = batchCopy.keySet();

				// Fetch FaoInstitutes from DB
				final List<FaoInstitute> existing = instituteService.getInstitutes(instCodes);
				final List<FaoInstitute> toSave = new ArrayList<FaoInstitute>(instCodes.size());

				for (final FaoInstitute faoInstitute : existing) {

					final String[] line = batchCopy.get(faoInstitute.getCode());
					// Remove it
					batchCopy.remove(faoInstitute.getCode());

					if (updateData(faoInstitute, line)) {
						toSave.add(faoInstitute);
					}
				}

				for (final var entry : batchCopy.entrySet()) {
					LOG.info("Adding FaoInstitute with code: {}", entry.getKey());
					toSave.add(insertData(entry.getValue()));
				}

				// Save what needs saving
				if (toSave.size() > 0) {
					LOG.info("Updating FaoInstitutes: {}", toSave.size());
					instituteService.update(toSave);
					toSave.forEach(accessionCounter::recountInstitute);
				}
			}

			private FaoInstitute insertData(String[] line) {
				final FaoInstitute faoInstitute = new FaoInstitute();
				updateData(faoInstitute, line);
				return faoInstitute;
			}

			private boolean updateData(FaoInstitute faoInstitute, String[] line) {
				final String instCode = line[COL_INSTCODE];
				final String acronym = line[COL_ACRONYM];
				// final String ecpaAcronym = line[COL_ECPACRONYM];
				final String fullName = line[COL_FULL_NAME];
				final String type = line[COL_TYPE];
				final boolean maintColl = "1".equals(line[COL_GENEBANK_LONG_TERM_COLLECTIONS]) || "1".equals(line[COL_GENEBANK_MEDIUM_TERM_COLLECTIONS])
						|| "1".equals(line[COL_GENEBANK_SHORT_TERM_COLLECTIONS]);
				final boolean pgrActivity = maintColl || "1".equals(line[COL_BOTANICAL_GARDEN]);
				// final String streetPob = line[COL_STREET_POB];
				// final String cityState = line[COL_CITY_STATE];
				// final String zipCode = line[COL_ZIP_CODE];
				// final String phone = line[COL_PHONE];
				// final String fax = line[COL_FAX];
				final String email = line[COL_EMAIL];
				String url = line[COL_URL];
				final String latitude = line[COL_LATITUDE];
				final String longitude = line[COL_LONGITUDE];
				final String elevation = line[COL_ALTITUDE];
				// final String updatedOn = line[COL_UPDATED_ON];
				final String vInstCode = line[COL_V_INSTCODE];
				final String isoCountry = line[COL_ISO3];

				if (faoInstitute.getCode() == null) {
					faoInstitute.setCode(instCode);
				}
				faoInstitute.setAcronym(acronym);
				faoInstitute.setFullName(fullName);
				if (LOG.isTraceEnabled()) {
					LOG.trace("Updating: code={} fullname='{}' acronym={}", instCode, fullName, acronym);
				}
				faoInstitute.setPgrActivity(pgrActivity);
				faoInstitute.setMaintainsCollection(maintColl);
				faoInstitute.setEmail(email);
				faoInstitute.setType(type);
				if (url != null && url.length() > 299) {
					LOG.warn("Trimming URL of {} to 300 characters: {}", instCode, url);
				}
				faoInstitute.setUrl(StringUtils.abbreviate(url, 300));
				final Double lat = parseDoubleIgnore0(latitude, 1);
				faoInstitute.setLatitude(lat);
				final Double lon = parseDoubleIgnore0(longitude, 1);
				faoInstitute.setLongitude(lon);
				final Double elev = parseDoubleIgnore0(elevation, 1);
				faoInstitute.setElevation(elev);
				faoInstitute.setVCode(vInstCode);
				faoInstitute.setCurrent(vInstCode == null || StringUtils.equals(instCode, vInstCode));

				// Update institute country if null or when not matching the
				// code
				faoInstitute.setCountry(geoService.getCurrentCountry(isoCountry));

				return true;
			}
		});
	}

	public static class InstituteUpdateException extends Exception {
		private static final long serialVersionUID = 1L;

		public InstituteUpdateException(String message) {
			super(message);
		}

	}
}