CountryAlternateNamesUpdater.java

/**
 * Copyright 2014 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 **/

package org.genesys.server.service.worker;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.model.impl.Country;
import org.genesys.server.service.CountryNamesUpdater;
import org.genesys.server.service.GeoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

/*
 The table 'alternate names' :
 -----------------------------
 0 alternateNameId   : the id of this alternate name, int
 1 geonameid         : geonameId referring to id in table 'geoname', int
 2 isolanguage       : iso 639 language code 2- or 3-characters; 4-characters 'post' for postal codes and 'iata','icao' and faac for airport codes, fr_1793 for French Revolution names,  abbr for abbreviation, link for a website, varchar(7)
 3 alternate name    : alternate name or name variant, varchar(200)
 4 isPreferredName   : '1', if this alternate name is an official/preferred name
 5 isShortName       : '1', if this is a short name like 'California' for 'State of California'
 6 isColloquial      : '1', if this alternate name is a colloquial or slang term
 7 isHistoric        : '1', if this alternate name is historic and was used in the past
 */
@Component
public class CountryAlternateNamesUpdater implements CountryNamesUpdater {

	public static final String ALTERNATE_NAMES_URL = "https://download.geonames.org/export/dump/alternateNames.zip";

	public static final Logger LOG = LoggerFactory.getLogger(CountryAlternateNamesUpdater.class);

	@Autowired
	private GeoService geoService;

	@Autowired
	private TaskExecutor taskExecutor;

	@Autowired
	private ObjectMapper objectMapper;

	private static final int BATCH_SIZE = 100;

	/**
	 * Update local FaoInstitute with data from WIEWS database
	 * 
	 * @throws IOException
	 */
	@Override
	public void updateAlternateNames() throws IOException {
		InputStream alternateNamesStream = null;

		final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
		final HttpGet httpget = new HttpGet(ALTERNATE_NAMES_URL);
		HttpResponse response = null;
		try {
			response = httpclient.execute(httpget);

			// Get hold of the response entity
			final HttpEntity entity = response.getEntity();
			if (entity == null) {
				LOG.warn("No HttpEntity in response, bailing out");
				return;
			}
			LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());

			// If the response does not enclose an entity, there is no
			// need
			// to bother about connection release
			if (entity != null) {
				ZipInputStream instream = null;

				instream = new ZipInputStream(entity.getContent());
				ZipEntry zipEntry = null;
				do {
					zipEntry = instream.getNextEntry();
					if (zipEntry == null)
						break;
					LOG.debug("Got entry: {}", zipEntry.getName());
					if (zipEntry.getName().equals("alternateNames.txt")) {
						LOG.info("Found alternateNames.zip");
						alternateNamesStream = instream;
						break;
					}
				} while (zipEntry != null);

				if (alternateNamesStream == null) {
					LOG.warn("Didn't find alternateNames.zip, stopping update.");
				} else {
					updateFromStream(alternateNamesStream);
				}
			}

		} catch (final ClientProtocolException e) {
			LOG.error(e.getMessage(), e);
			throw new IOException(e);
		} catch (final IOException e) {
			LOG.error(e.getMessage(), e);
			throw e;
		} finally {
			IOUtils.closeQuietly(alternateNamesStream);
			IOUtils.closeQuietly(httpclient);
		}
	}

	private void updateFromStream(InputStream instream) throws IOException {
		var parser = new CSVParserBuilder().withSeparator('\t').build();
		final CSVReader reader = new CSVReaderBuilder(new InputStreamReader(new BufferedInputStream(instream), StandardCharsets.UTF_8)).withCSVParser(parser).build();

		try {

			final List<String[]> batch = new ArrayList<String[]>(BATCH_SIZE);

			String prevRefnameId = null;

			String[] line = null;
			final List<Long> countryRefnameIds = geoService.listCountryRefnameIds();

			LOG.info("Got {} refnameIds", countryRefnameIds.size());
			int counter = 0;

			// Timer
			final StopWatch stopWatch = new StopWatch();
			stopWatch.start();

			while ((line = reader.readNext()) != null) {
				counter++;
				if (LOG.isDebugEnabled() && counter % 10000 == 0) {
					LOG.debug("Country alternate names @ line {} in {}ms", counter, stopWatch.getTime());
				}

				for (int i = 0; i < line.length; i++) {
					if (line[i].equals("null") || StringUtils.isBlank(line[i])) {
						line[i] = null;
					}
				}

				final String refnameId = line[1];

				if (prevRefnameId == null || refnameId.equals(prevRefnameId)) {
					prevRefnameId = refnameId;
					batch.add(line);
				} else {
					final long longRefnameId = Long.parseLong(prevRefnameId);
					if (countryRefnameIds.contains(longRefnameId)) {
						workIt(longRefnameId, batch);
					}
					batch.clear();
					prevRefnameId = null;
				}

			}

			if (batch.size() > 0) {
				LOG.debug("Have items in the batch after loop.");
				if (prevRefnameId != null) {
					final long longRefnameId = Long.parseLong(prevRefnameId);
					if (countryRefnameIds.contains(longRefnameId)) {
						workIt(Long.parseLong(prevRefnameId), batch);
						batch.clear();
					}
				} else {
					LOG.warn("But no prefRefnameId!!");
				}
			}

			stopWatch.stop();
			LOG.info("Done importing alternate geonames in {}ms", stopWatch.getTime());
		} catch (final UnsupportedEncodingException | CsvValidationException e) {
			LOG.error(e.getMessage(), e);
		} finally {
			IOUtils.closeQuietly(instream);
		}

	}

	private void workIt(final long refnameId, final List<String[]> batch) {
		// Need copy!
		final List<String[]> batchCopy = new ArrayList<String[]>(batch);

		taskExecutor.execute(new Runnable() {
			@Override
			public void run() {
				// Fetch FaoInstitutes from DB
				final Country country = geoService.getCountryByRefnameId(refnameId);

				if (country == null) {
					if (LOG.isDebugEnabled()) {
						LOG.debug("Nothing to check, no country with refnameId={}", refnameId);
					}
					return;
				}

				Map<String, Object> customNamesMap = null;
				try {
					if (StringUtils.isNotBlank(country.getNameJCustom()))
						customNamesMap = objectMapper.readValue(country.getNameJCustom(), new TypeReference<>() {});
				} catch (JsonProcessingException e) {
					LOG.warn("Error while parsing nameJCustom of {}: {}", country, e.getMessage());
				}

				LOG.info("Processing country refnames {}", batchCopy.size());
				String wiki = country.getWikiLink();

				final ObjectNode jsonNames = objectMapper.createObjectNode();

				for (final String[] line : batchCopy) {
					// if (refnameId == 1668284) {
					// System.err.println(ArrayUtils.toString(line));
					// }
					final boolean preferred = line.length > 4 && "1".equals(line[4]);
					final boolean colloquial = line.length > 6 && "1".equals(line[6]);
					final boolean historic = line.length > 7 && "1".equals(line[7]);

					if (line[2] != null && (line[2].length() == 2 || (line[2].length() == 5 && line[2].charAt(2) == '-' )) && line[3] != null) {
						if (!historic && !colloquial) {
							if (preferred || jsonNames.get(line[2]) == null) {
								jsonNames.put(line[2], line[3]);
							}
						}
					}
					if (line[2] != null && "link".equals(line[2])) {
						if (preferred || wiki == null) {
							wiki = line[3];
						}
					}
				}

				// add or override jsonNames
				if (customNamesMap != null) {
					for (Map.Entry<String, Object> entry : customNamesMap.entrySet()) {
						jsonNames.put(entry.getKey(), String.valueOf(entry.getValue()));
					}
				}

				if (LOG.isDebugEnabled()) {
					LOG.debug("We are at: {} i18n = {}", country, jsonNames);
				}
				geoService.updateCountryNames(country.getCode3(), jsonNames.toString());
				geoService.updateCountryWiki(country.getCode3(), wiki);
			}
		});
	}
}