ITPGRFAStatusUpdater.java
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.service.worker;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.model.impl.Country;
import org.genesys.server.model.impl.ITPGRFAStatus;
import org.genesys.server.service.GeoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;
/**
* Update country ITPGRFA status by fetching data from
* {@link ITPGRFAStatusUpdater#ITPGRFA_STATUS_URL}.
*
* @author Matija Obreza
*/
@Component
public class ITPGRFAStatusUpdater {
public static String ITPGRFA_STATUS_URL = "http://www.fao.org/plant-treaty/countries/membership/export-membership/en/";
public static final Logger LOG = LoggerFactory.getLogger(ITPGRFAStatusUpdater.class);
@Autowired
private GeoService geoService;
@Autowired
private TaskExecutor taskExecutor;
private static final int BATCH_SIZE = 20;
private static final String[] CSV_HEADERS = { "Country", "ISO3", "Region", "Contracting Party", "Membership", "By", "Income", "Development", "Signature date",
"Accession date", "Ratification date", "Approval date", "Acceptance date", "Entry into Force" };
protected static final int COLUMN_COUNTRY_CODE3 = 1;
protected static final int COLUMN_CONTRACTING_PARTY = 3;
protected static final int COLUMN_MEMBERSHIP = 4;
protected static final int COLUMN_MEMBERSHIP_BY = 5;
/**
* Update local {@link ITPGRFAStatus} entries with data from CSV
*
* @throws IOException
*/
public void downloadAndUpdate() throws IOException {
InputStream itpgrfaCSVStream = null;
final HttpGet httpget = new HttpGet(ITPGRFA_STATUS_URL);
HttpResponse response = null;
final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
try {
response = httpclient.execute(httpget);
// Get hold of the response entity
final HttpEntity entity = response.getEntity();
if (entity == null) {
LOG.warn("No HttpEntity in response, bailing out");
return;
}
LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());
// If the response does not enclose an entity, there is no
// need to bother about connection release
if (entity != null) {
itpgrfaCSVStream = new BufferedInputStream(entity.getContent());
}
updateFromStream(itpgrfaCSVStream);
} catch (final ClientProtocolException e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
throw e;
} finally {
IOUtils.closeQuietly(itpgrfaCSVStream);
IOUtils.closeQuietly(httpclient);
}
}
private void updateFromStream(InputStream instream) throws IOException {
var parser = new CSVParserBuilder()
.withSeparator(',')
.withQuoteChar('"')
.withEscapeChar('\\')
.withStrictQuotes(false)
.withIgnoreQuotations(false)
.withIgnoreLeadingWhiteSpace(true)
.build();
final CSVReader reader = new CSVReaderBuilder(new BufferedReader(new InputStreamReader(instream))).withCSVParser(parser).build();
try {
final List<String[]> batch = new ArrayList<String[]>(BATCH_SIZE);
// Get headers
String[] line = reader.readNext();
LOG.warn("Got headers: {}", ArrayUtils.toString(line));
if (CSV_HEADERS.length > line.length) {
reader.close();
throw new IOException("CSV header count mismatch. Found: " + ArrayUtils.toString(line));
}
for (int i = CSV_HEADERS.length - 1; i >= 0; i--) {
if (!line[i].equals(CSV_HEADERS[i])) {
reader.close();
throw new IOException("CSV header mismatch, found '" + line[i] + "' instead of '" + CSV_HEADERS[i] + "'");
}
}
// Timer
final StopWatch stopWatch = new StopWatch();
stopWatch.start();
while ((line = reader.readNext()) != null) {
for (int i = 0; i < line.length; i++) {
if (line[i].equals("null") || StringUtils.isBlank(line[i])) {
line[i] = null;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(">>> {}", ArrayUtils.toString(line, "NULL"));
}
batch.add(line);
if (batch.size() >= BATCH_SIZE) {
workIt(batch);
batch.clear();
}
}
if (batch.size() > 0) {
LOG.debug("Have items in the batch after loop.");
workIt(batch);
batch.clear();
}
stopWatch.stop();
LOG.info("Done importing ITPGRFA status in {}ms", stopWatch.getTime());
} catch (CsvValidationException e) {
throw new IOException(e);
} finally {
IOUtils.closeQuietly(reader);
}
}
private void workIt(final List<String[]> batch) {
// Need copy!
final List<String[]> batchCopy = new ArrayList<String[]>(batch);
taskExecutor.execute(new Runnable() {
@Override
public void run() {
for (final String[] line : batchCopy) {
if (LOG.isDebugEnabled()) {
LOG.debug("Working on {}", ArrayUtils.toString(line, "NULL"));
}
if (!StringUtils.isBlank(line[COLUMN_COUNTRY_CODE3])) {
updateCountry(line[COLUMN_COUNTRY_CODE3], line[COLUMN_CONTRACTING_PARTY], line[COLUMN_MEMBERSHIP], line[COLUMN_MEMBERSHIP_BY]);
}
}
}
});
}
protected void updateCountry(String countryCode, String contractingParty, String membership, String membershipBy) {
final Country country = geoService.findCountry(countryCode);
if (country == null) {
LOG.error("No country with name={}", countryCode);
return;
}
geoService.updateITPGRFA(country, contractingParty, membership, membershipBy);
}
}