ITPGRFAStatusUpdater.java

/*
 * Copyright 2019 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.genesys.server.service.worker;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.model.impl.Country;
import org.genesys.server.model.impl.ITPGRFAStatus;
import org.genesys.server.service.GeoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;

import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;

/**
 * Update country ITPGRFA status by fetching data from
 * {@link ITPGRFAStatusUpdater#ITPGRFA_STATUS_URL}.
 *
 * @author Matija Obreza
 */
@Component
public class ITPGRFAStatusUpdater {

	public static String ITPGRFA_STATUS_URL = "http://www.fao.org/plant-treaty/countries/membership/export-membership/en/";

	public static final Logger LOG = LoggerFactory.getLogger(ITPGRFAStatusUpdater.class);

	@Autowired
	private GeoService geoService;

	@Autowired
	private TaskExecutor taskExecutor;

	private static final int BATCH_SIZE = 20;

	private static final String[] CSV_HEADERS = { "Country", "ISO3", "Region", "Contracting Party", "Membership", "By", "Income", "Development", "Signature date",
			"Accession date", "Ratification date", "Approval date", "Acceptance date", "Entry into Force" };

	protected static final int COLUMN_COUNTRY_CODE3 = 1;
	protected static final int COLUMN_CONTRACTING_PARTY = 3;
	protected static final int COLUMN_MEMBERSHIP = 4;
	protected static final int COLUMN_MEMBERSHIP_BY = 5;

	/**
	 * Update local {@link ITPGRFAStatus} entries with data from CSV
	 *
	 * @throws IOException
	 */
	public void downloadAndUpdate() throws IOException {
		InputStream itpgrfaCSVStream = null;

		final HttpGet httpget = new HttpGet(ITPGRFA_STATUS_URL);
		HttpResponse response = null;
		final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
		try {
			response = httpclient.execute(httpget);

			// Get hold of the response entity
			final HttpEntity entity = response.getEntity();
			if (entity == null) {
				LOG.warn("No HttpEntity in response, bailing out");
				return;
			}
			LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());

			// If the response does not enclose an entity, there is no
			// need to bother about connection release
			if (entity != null) {
				itpgrfaCSVStream = new BufferedInputStream(entity.getContent());
			}

			updateFromStream(itpgrfaCSVStream);

		} catch (final ClientProtocolException e) {
			LOG.error(e.getMessage(), e);
			throw new IOException(e);
		} catch (final IOException e) {
			LOG.error(e.getMessage(), e);
			throw e;
		} finally {
			IOUtils.closeQuietly(itpgrfaCSVStream);
			IOUtils.closeQuietly(httpclient);
		}
	}

	private void updateFromStream(InputStream instream) throws IOException {
		var parser = new CSVParserBuilder()
			.withSeparator(',')
			.withQuoteChar('"')
			.withEscapeChar('\\')
			.withStrictQuotes(false)
			.withIgnoreQuotations(false)
			.withIgnoreLeadingWhiteSpace(true)
			.build();

		final CSVReader reader = new CSVReaderBuilder(new BufferedReader(new InputStreamReader(instream))).withCSVParser(parser).build();

		try {
			final List<String[]> batch = new ArrayList<String[]>(BATCH_SIZE);

			// Get headers
			String[] line = reader.readNext();
			LOG.warn("Got headers: {}", ArrayUtils.toString(line));
			if (CSV_HEADERS.length > line.length) {
				reader.close();
				throw new IOException("CSV header count mismatch. Found: " + ArrayUtils.toString(line));
			}

			for (int i = CSV_HEADERS.length - 1; i >= 0; i--) {
				if (!line[i].equals(CSV_HEADERS[i])) {
					reader.close();
					throw new IOException("CSV header mismatch, found '" + line[i] + "' instead of '" + CSV_HEADERS[i] + "'");
				}
			}

			// Timer
			final StopWatch stopWatch = new StopWatch();
			stopWatch.start();

			while ((line = reader.readNext()) != null) {
				for (int i = 0; i < line.length; i++) {
					if (line[i].equals("null") || StringUtils.isBlank(line[i])) {
						line[i] = null;
					}
				}

				if (LOG.isDebugEnabled()) {
					LOG.debug(">>> {}", ArrayUtils.toString(line, "NULL"));
				}

				batch.add(line);

				if (batch.size() >= BATCH_SIZE) {
					workIt(batch);
					batch.clear();
				}
			}

			if (batch.size() > 0) {
				LOG.debug("Have items in the batch after loop.");
				workIt(batch);
				batch.clear();
			}

			stopWatch.stop();
			LOG.info("Done importing ITPGRFA status in {}ms", stopWatch.getTime());

		} catch (CsvValidationException e) {
			throw new IOException(e);
		} finally {
			IOUtils.closeQuietly(reader);
		}

	}

	private void workIt(final List<String[]> batch) {

		// Need copy!
		final List<String[]> batchCopy = new ArrayList<String[]>(batch);

		taskExecutor.execute(new Runnable() {
			@Override
			public void run() {
				for (final String[] line : batchCopy) {
					if (LOG.isDebugEnabled()) {
						LOG.debug("Working on {}", ArrayUtils.toString(line, "NULL"));
					}
					if (!StringUtils.isBlank(line[COLUMN_COUNTRY_CODE3])) {
						updateCountry(line[COLUMN_COUNTRY_CODE3], line[COLUMN_CONTRACTING_PARTY], line[COLUMN_MEMBERSHIP], line[COLUMN_MEMBERSHIP_BY]);
					}
				}
			}
		});
	}

	protected void updateCountry(String countryCode, String contractingParty, String membership, String membershipBy) {
		final Country country = geoService.findCountry(countryCode);

		if (country == null) {
			LOG.error("No country with name={}", countryCode);
			return;
		}

		geoService.updateITPGRFA(country, contractingParty, membership, membershipBy);
	}
}