WorldClimUpdater.java

/**
 * Copyright 2015 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 **/

package org.genesys.server.service.worker;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.component.aspect.AsAdmin;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionId;
import org.genesys.server.model.genesys.QAccessionId;
import org.genesys.server.model.impl.QTileClimate;
import org.genesys.server.model.impl.TileClimate;
import org.genesys.server.persistence.AccessionIdRepository;
import org.genesys.server.persistence.GenesysLowlevelRepository;
import org.genesys.server.service.ClimateDataService;
import org.genesys.server.service.ElasticsearchService;
import org.genesys.server.service.filter.AccessionFilter;
import org.genesys.util.TileIndexCalculator;
import org.genesys.worldclim.WorldClimUtil;
import org.genesys.worldclim.grid.generic.GenericGridFile;
import org.genesys.worldclim.grid.generic.GenericGridZipFile;
import org.genesys.worldclim.grid.generic.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.querydsl.jpa.JPAExpressions;
import com.querydsl.jpa.impl.JPAQueryFactory;

import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;

/**
 * Download data from WorldClim.org and update local cache.
 * {@link WorldClimUpdaterImpl#WORLDCLIM_ORG}.
 *
 * @author Matija Obreza
 */
@Component
public class WorldClimUpdater implements InitializingBean {
	public static final Logger LOG = LoggerFactory.getLogger(WorldClimUpdater.class);

	public static final UUID WORLDCLIM_DATASET = UUID.fromString("BC84433B-A626-4BDF-97D3-DB36D79499C6");

	public static String WORLDCLIM_ORG = "https://www.worldclim.org";
	public static String WORLDCLIM_ORG_FILEBASE = "https://geodata.ucdavis.edu/climate/worldclim/1_4/grid/cur/";

	// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/tmin_2-5m_bil.zip
	// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/tmax_2-5m_bil.zip
	// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/tmean_2-5m_bil.zip
	// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/prec_2-5m_bil.zip
	// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/bio_2-5m_bil.zip
	// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/alt_2-5m_bil.zip

	@Value("${worldclim.dir}")
	private String worldclimDir;

	@Autowired
	private TaskExecutor taskExecutor;

	private File worldClimDir;

	@Autowired
	private ClimateDataService climateDataService;

	@Autowired
	@Qualifier("genesysLowlevelRepositoryCustomImpl")
	private GenesysLowlevelRepository genesysLowlevel;

	@Autowired
	private AccessionIdRepository accessionIdRepository;

	@Autowired(required = false)
	private ElasticsearchService elasticsearchService;

	@Autowired
	protected JPAQueryFactory jpaQueryFactory;

	@Override
	public void afterPropertiesSet() throws Exception {
		this.worldClimDir = new File(worldclimDir);
		this.worldClimDir.mkdirs();
	}

	@AsAdmin
	public void downloadAll() {
		taskExecutor.execute(() -> {
			try {
				downloadAndExtract(worldClimDir, "alt_2-5m_bil.zip");
			} catch (IOException e) {
				LOG.error("Failed to download and import", e);
			}
		});
		taskExecutor.execute(() -> {
			try {
				downloadAndExtract(worldClimDir, "prec_2-5m_bil.zip");
			} catch (IOException e) {
				LOG.error("Failed to download and import", e);
			}
		});
		taskExecutor.execute(() -> {
			try {
				downloadAndExtract(worldClimDir, "tmin_2-5m_bil.zip");
			} catch (IOException e) {
				LOG.error("Failed to download and import", e);
			}
		});
		taskExecutor.execute(() -> {
			try {
				downloadAndExtract(worldClimDir, "tmax_2-5m_bil.zip");
			} catch (IOException e) {
				LOG.error("Failed to download and import", e);
			}
		});
		taskExecutor.execute(() -> {
			try {
				downloadAndExtract(worldClimDir, "tmean_2-5m_bil.zip");
			} catch (IOException e) {
				LOG.error("Failed to download and import", e);
			}
		});
		taskExecutor.execute(() -> {
			try {
				downloadAndExtract(worldClimDir, "bio_2-5m_bil.zip");
			} catch (IOException e) {
				LOG.error("Failed to download and import", e);
			}
		});
	}

	/**
	 * Ensure that local copy of WorldClim data exists and then update
	 * 'variableName' for all accessions with lat/lng
	 * 
	 * @param variableName the WorldClim variable to update
	 * @param tileIndexesToUpdate Use `null` to update all indexes in use
	 * @throws IOException
	 */
	public void update(String variableName, Set<Long> tileIndexesToUpdate) throws IOException {
		if (tileIndexesToUpdate != null && tileIndexesToUpdate.size() == 0) {
			return;
		}

		if (!haveData(variableName)) {
			LOG.warn("Missing .bil and .hdr files for {}", variableName);
			downloadAndExtract(worldClimDir, getFileName(variableName));
		}

		LOG.debug("Using worldClim {}", variableName);

		GenericGridFile ggf = new GenericGridFile(worldClimDir, variableName);
		Header header = ggf.readHeader();

		MappedByteBuffer buffer = ggf.mapDataBuffer();

		double factor = WorldClimUtil.getFactor(variableName);

		Set<Long> tileIndexSet = tileIndexesToUpdate != null ? tileIndexesToUpdate : getExistingTileIndexes();

		List<Long> tileIndexes = new ArrayList<Long>(tileIndexSet);
		int batchSize = 1000;
		for (int fromIndex = 0; fromIndex < tileIndexes.size(); fromIndex += batchSize) {
			HashSet<Long> ids = new HashSet<Long>(tileIndexes.subList(fromIndex, Math.min(fromIndex + batchSize, tileIndexes.size())));
			LOG.info("Processing variable {} for tileIndexes: {}-{} of {}", variableName, fromIndex, fromIndex + ids.size(), tileIndexes.size());

			for (int i = 5; i >= 0; i--) {
				try {
					climateDataService.worldclimUpdate(variableName, ids, buffer, header, factor);
					break; // break try-5-times loop
				} catch (Throwable e) {
					if (i == 0) {
						throw e;
					}
					LOG.warn("Problem updating variable {}. {}. Retrying.", variableName, e.getMessage());
					try {
						Thread.sleep(RandomUtils.nextInt(10, 1000));
					} catch (InterruptedException e1) {
					}
				}
			}
		}

		LOG.info("Done processing {}", variableName);
	}

	private boolean missingTileIndexes() {
		Set<Long> missingIds = accessionIdRepository.withMissingTileIndex();
		if (missingIds.size() > 0) {
			LOG.warn("{} accession records don't have tileIndex assigned", missingIds.size());
		}
		return missingIds.size() > 0;
	}

	private Set<Long> generateTileIndexes() throws JsonParseException, JsonMappingException, IOException {
		// read accessions with lat/lng and update in batch
		final List<Long> accessionIds = new ArrayList<>(accessionIdRepository.withMissingTileIndex());
		LOG.info("Retrieved {} accession ids without tileIndex", accessionIds.size());

		final Set<Long> tileIndexSet = new HashSet<Long>(20000);

		int batchSize = 1000;

		for (int fromIndex = 0; fromIndex < accessionIds.size(); fromIndex += batchSize) {
			final HashSet<Long> ids = new HashSet<Long>(accessionIds.subList(fromIndex, Math.min(fromIndex + batchSize, accessionIds.size())));

			final int startIndex = fromIndex;
			taskExecutor.execute(() -> {
				try {
					List<AccessionId> toSave = new ArrayList<AccessionId>(ids.size());
					LOG.debug("Calculating tileIndex at position {} of {}", startIndex, accessionIds.size());
					for (AccessionId aid : accessionIdRepository.findAllById(ids)) {
						Double longitude = aid.getLongitude();
						Double latitude = aid.getLatitude();

						Long tileIndex = WorldClimUtil.getWorldclim25Tile(longitude, latitude);
						Integer tileIndex3deg = TileIndexCalculator.get3MinuteTileIndex(longitude, latitude);

						if (tileIndex != null) {
							aid.setTileIndex(tileIndex);
							tileIndexSet.add(tileIndex);
						} else if (aid.getTileIndex() != null) {
							aid.setTileIndex(null);
						}
						if (tileIndex3deg != null) {
							aid.setTileIndex3min(tileIndex3deg);
						} else if (aid.getTileIndex3min() != null) {
							aid.setTileIndex3min(null);
						}
						toSave.add(aid);
					}
					if (toSave.size() > 0) {
						LOG.info("Updating {} accessions' tileIndex at position {} of {}", toSave.size(), startIndex, accessionIds.size());
						climateDataService.updateAccessionTileIndex(toSave);
					}
				} catch (Throwable e) {
					LOG.error(e.getMessage(), e);
				}
			});
		}
		return tileIndexSet;
	}

	private Set<Long> getExistingTileIndexes() {
		final Set<Long> tileIndexSet = new HashSet<Long>(20000);

		tileIndexSet.addAll(accessionIdRepository.getTileIndexes());
		return tileIndexSet;
	}

	private String getFileName(String variableName) throws FileNotFoundException {
		if ("alt".equalsIgnoreCase(variableName)) {
			return "alt_2-5m_bil.zip";
		} else if (variableName.matches("prec\\d{1,2}")) {
			return "prec_2-5m_bil.zip";
		} else if (variableName.matches("tmax\\d{1,2}")) {
			return "tmax_2-5m_bil.zip";
		} else if (variableName.matches("tmin\\d{1,2}")) {
			return "tmin_2-5m_bil.zip";
		} else if (variableName.matches("tmean\\d{1,2}")) {
			return "tmean_2-5m_bil.zip";
		} else if (variableName.matches("bio\\d{1,2}")) {
			return "bio_2-5m_bil.zip";
		} else
			throw new FileNotFoundException("No worldclim file for " + variableName);
	}

	private boolean haveData(String variableName) {
		GenericGridFile ggf = new GenericGridFile(worldClimDir, variableName);
		return ggf.getHeaderFile().exists() && ggf.getDataFile().exists();
	}

	/**
	 * Download WorldClim file to a temporary file
	 * 
	 * @param fileName
	 * @return resulting file
	 * @throws IOException
	 */
	public File download(String fileName) throws IOException {

		File tempFile = File.createTempFile("worldclim", ".zip");

		try (CloseableHttpClient httpclient = HttpClientBuilder.create().build()) {

			HttpResponse response = httpclient.execute(new HttpGet(WORLDCLIM_ORG_FILEBASE + fileName));

			// Get hold of the response entity
			final HttpEntity entity = response.getEntity();
			LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());

			try (InputStream istream = new BufferedInputStream(entity.getContent()); OutputStream ostream = new BufferedOutputStream(new FileOutputStream(tempFile))) {
				LOG.info("Downloading {} {}", fileName, entity.getContentLength());
				IOUtils.copy(istream, ostream);
				ostream.flush();
				LOG.info("Retrieved {}", fileName);
			}

		} catch (final ClientProtocolException e) {
			LOG.error(e.getMessage(), e);
			tempFile.delete();
			throw new IOException(e);
		}

		return tempFile;
	}

	/**
	 * Download WorldClim file and extract to destination folder.
	 * 
	 * @param destination
	 * @param fileName
	 * @return
	 * @throws IOException
	 */
	public List<File> downloadAndExtract(File destination, String fileName) throws IOException {
		File tempFile = download(fileName);
		try {
			List<File> files = GenericGridZipFile.unzip(tempFile, destination);
			for (File file : files) {
				LOG.info("Extracted {}", file.getCanonicalPath());
			}
			return files;
		} finally {
			tempFile.delete();
		}
	}

	/**
	 * Update only tileIndexes in use by {@link AccessionId} that don't have a
	 * record in {@link TileClimate}.
	 * 
	 * Executed 5hrs after startup, then 4hrs between each run.
	 */
	@Scheduled(fixedDelayString = "PT4H", initialDelayString = "PT5H")
	@SchedulerLock(name = "org.genesys.server.service.worker.WorldClimUpdater")
	protected void autoUpdate() {
		LOG.warn("Executing scheduled run of WorldClim auto-update for tiles without data.");

		if (missingTileIndexes()) {
			try {
				generateTileIndexes();
			} catch (IOException e) {
				LOG.error("Error generating missing tileIndexes: {}", e.getMessage());
				throw new RuntimeException("Error generating missing tileIndexes", e);
			}
		}

		// Select tile-indexes to load
		var tileIndexesInUse = jpaQueryFactory.from(QAccessionId.accessionId)
			// where we have tile index that is not in TileClimate
			.where(QAccessionId.accessionId.tileIndex.notIn(JPAExpressions.selectFrom(QTileClimate.tileClimate).select(QTileClimate.tileClimate.tileIndex).distinct()))
			// need distinct tile index
			.select(QAccessionId.accessionId.tileIndex).distinct()
			// list
			.fetch();

		updateSelectedTileIndexes(new HashSet<>(tileIndexesInUse));
	}

	/**
	 * Update all WorldClim variables.
	 */
	public void update() {
		if (missingTileIndexes()) {
			try {
				generateTileIndexes();
			} catch (IOException e) {
				LOG.error("Error generating missing tileIndexes: {}", e.getMessage());
				throw new RuntimeException("Error generating missing tileIndexes", e);
			}
		}

		updateSelectedTileIndexes(null);
	}

	private void updateSelectedTileIndexes(Set<Long> tileIndexesToUpdate) {

		if (tileIndexesToUpdate != null && tileIndexesToUpdate.size() == 0) {
			// Don't bother
			return;
		}

		var startDate = Instant.now(); // Use this to find tileIndexes modified in this run.
		taskExecutor.execute(() -> {
			LOG.warn("WorldClim variable update started at {} for {} tile indexes", Instant.now(), tileIndexesToUpdate == null ? "ALL" : tileIndexesToUpdate.size());
			try {
				update("alt", tileIndexesToUpdate);
			} catch (IOException e) {
				LOG.error("Error updating worldClim data for variable alt", e);
			}

			for (int i = 1; i <= 12; i++) {
				final int variableIndex = i;
				try {
					update("tmin" + variableIndex, tileIndexesToUpdate);
				} catch (IOException e) {
					LOG.error("Error updating worldClim data for variable tmin" + variableIndex, e);
				}
				try {
					update("tmax" + variableIndex, tileIndexesToUpdate);
				} catch (IOException e) {
					LOG.error("Error updating worldClim data for variable tmax" + variableIndex, e);
				}
				try {
					update("tmean" + variableIndex, tileIndexesToUpdate);
				} catch (IOException e) {
					LOG.error("Error updating worldClim data for variable tmean" + variableIndex, e);
				}
				try {
					update("prec" + variableIndex, tileIndexesToUpdate);
				} catch (IOException e) {
					LOG.error("Error updating worldClim data for variable prec" + variableIndex, e);
				}
			}

			for (int i = 1; i <= 19; i++) {
				final int variableIndex = i;
				try {
					update("bio" + variableIndex, tileIndexesToUpdate);
				} catch (IOException e) {
					LOG.error("Error updating worldClim data for variable bio" + variableIndex, e);
				}
			}

			// Schedule indexing of accessions with tileInices updated since startDate
			// @Query("select distinct tileIndex from TileClimate where modifiedDate >= ?1")
			var updatedTileIndexes = new HashSet<>(jpaQueryFactory
				// select distinct tile index
				.select(QTileClimate.tileClimate.tileIndex).distinct()
				// from TileClimate
				.from(QTileClimate.tileClimate)
				// where modifiedDate > startDate
				.where(QTileClimate.tileClimate.modifiedDate.goe(startDate))
				// get
				.fetch());

			LOG.warn("TileClimate for {} records was updated in this run (since {}).", updatedTileIndexes.size(), startDate);

			if (updatedTileIndexes.size() > 0) {
				if (updatedTileIndexes.size() > 5000) {
					LOG.warn("A full reindex of Accessions is in order, we have {} TileClimates with updates.", updatedTileIndexes.size());
				} else {
					try {
						var accessionFilter = new AccessionFilter();
						accessionFilter.geo().tileIndex(updatedTileIndexes);
						LOG.warn("Triggering reindexing of accessions using {} TileClimates", updatedTileIndexes.size());
						if (elasticsearchService != null) {
							elasticsearchService.reindex(Accession.class, accessionFilter);
						}
					} catch (Throwable e) {
						LOG.error("Error executing reindex Accession", e);
					}
				}
			}

			LOG.warn("WorldClim update ended at {}", Instant.now());
		});
	}
}