WorldClimUpdater.java
/**
* Copyright 2015 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.genesys.server.service.worker;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.server.component.aspect.AsAdmin;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionId;
import org.genesys.server.model.genesys.QAccessionId;
import org.genesys.server.model.impl.QTileClimate;
import org.genesys.server.model.impl.TileClimate;
import org.genesys.server.persistence.AccessionIdRepository;
import org.genesys.server.persistence.GenesysLowlevelRepository;
import org.genesys.server.service.ClimateDataService;
import org.genesys.server.service.ElasticsearchService;
import org.genesys.server.service.filter.AccessionFilter;
import org.genesys.util.TileIndexCalculator;
import org.genesys.worldclim.WorldClimUtil;
import org.genesys.worldclim.grid.generic.GenericGridFile;
import org.genesys.worldclim.grid.generic.GenericGridZipFile;
import org.genesys.worldclim.grid.generic.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.querydsl.jpa.JPAExpressions;
import com.querydsl.jpa.impl.JPAQueryFactory;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
/**
* Download data from WorldClim.org and update local cache.
* {@link WorldClimUpdaterImpl#WORLDCLIM_ORG}.
*
* @author Matija Obreza
*/
@Component
public class WorldClimUpdater implements InitializingBean {
public static final Logger LOG = LoggerFactory.getLogger(WorldClimUpdater.class);
public static final UUID WORLDCLIM_DATASET = UUID.fromString("BC84433B-A626-4BDF-97D3-DB36D79499C6");
public static String WORLDCLIM_ORG = "https://www.worldclim.org";
public static String WORLDCLIM_ORG_FILEBASE = "https://geodata.ucdavis.edu/climate/worldclim/1_4/grid/cur/";
// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/tmin_2-5m_bil.zip
// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/tmax_2-5m_bil.zip
// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/tmean_2-5m_bil.zip
// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/prec_2-5m_bil.zip
// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/bio_2-5m_bil.zip
// https://biogeo.ucdavis.edu/data/climate/worldclim/1_4/grid/cur/alt_2-5m_bil.zip
@Value("${worldclim.dir}")
private String worldclimDir;
@Autowired
private TaskExecutor taskExecutor;
private File worldClimDir;
@Autowired
private ClimateDataService climateDataService;
@Autowired
@Qualifier("genesysLowlevelRepositoryCustomImpl")
private GenesysLowlevelRepository genesysLowlevel;
@Autowired
private AccessionIdRepository accessionIdRepository;
@Autowired(required = false)
private ElasticsearchService elasticsearchService;
@Autowired
protected JPAQueryFactory jpaQueryFactory;
@Override
public void afterPropertiesSet() throws Exception {
this.worldClimDir = new File(worldclimDir);
this.worldClimDir.mkdirs();
}
@AsAdmin
public void downloadAll() {
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "alt_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "prec_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "tmin_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "tmax_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "tmean_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "bio_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
}
/**
* Ensure that local copy of WorldClim data exists and then update
* 'variableName' for all accessions with lat/lng
*
* @param variableName the WorldClim variable to update
* @param tileIndexesToUpdate Use `null` to update all indexes in use
* @throws IOException
*/
public void update(String variableName, Set<Long> tileIndexesToUpdate) throws IOException {
if (tileIndexesToUpdate != null && tileIndexesToUpdate.size() == 0) {
return;
}
if (!haveData(variableName)) {
LOG.warn("Missing .bil and .hdr files for {}", variableName);
downloadAndExtract(worldClimDir, getFileName(variableName));
}
LOG.debug("Using worldClim {}", variableName);
GenericGridFile ggf = new GenericGridFile(worldClimDir, variableName);
Header header = ggf.readHeader();
MappedByteBuffer buffer = ggf.mapDataBuffer();
double factor = WorldClimUtil.getFactor(variableName);
Set<Long> tileIndexSet = tileIndexesToUpdate != null ? tileIndexesToUpdate : getExistingTileIndexes();
List<Long> tileIndexes = new ArrayList<Long>(tileIndexSet);
int batchSize = 1000;
for (int fromIndex = 0; fromIndex < tileIndexes.size(); fromIndex += batchSize) {
HashSet<Long> ids = new HashSet<Long>(tileIndexes.subList(fromIndex, Math.min(fromIndex + batchSize, tileIndexes.size())));
LOG.info("Processing variable {} for tileIndexes: {}-{} of {}", variableName, fromIndex, fromIndex + ids.size(), tileIndexes.size());
for (int i = 5; i >= 0; i--) {
try {
climateDataService.worldclimUpdate(variableName, ids, buffer, header, factor);
break; // break try-5-times loop
} catch (Throwable e) {
if (i == 0) {
throw e;
}
LOG.warn("Problem updating variable {}. {}. Retrying.", variableName, e.getMessage());
try {
Thread.sleep(RandomUtils.nextInt(10, 1000));
} catch (InterruptedException e1) {
}
}
}
}
LOG.info("Done processing {}", variableName);
}
private boolean missingTileIndexes() {
Set<Long> missingIds = accessionIdRepository.withMissingTileIndex();
if (missingIds.size() > 0) {
LOG.warn("{} accession records don't have tileIndex assigned", missingIds.size());
}
return missingIds.size() > 0;
}
private Set<Long> generateTileIndexes() throws JsonParseException, JsonMappingException, IOException {
// read accessions with lat/lng and update in batch
final List<Long> accessionIds = new ArrayList<>(accessionIdRepository.withMissingTileIndex());
LOG.info("Retrieved {} accession ids without tileIndex", accessionIds.size());
final Set<Long> tileIndexSet = new HashSet<Long>(20000);
int batchSize = 1000;
for (int fromIndex = 0; fromIndex < accessionIds.size(); fromIndex += batchSize) {
final HashSet<Long> ids = new HashSet<Long>(accessionIds.subList(fromIndex, Math.min(fromIndex + batchSize, accessionIds.size())));
final int startIndex = fromIndex;
taskExecutor.execute(() -> {
try {
List<AccessionId> toSave = new ArrayList<AccessionId>(ids.size());
LOG.debug("Calculating tileIndex at position {} of {}", startIndex, accessionIds.size());
for (AccessionId aid : accessionIdRepository.findAllById(ids)) {
Double longitude = aid.getLongitude();
Double latitude = aid.getLatitude();
Long tileIndex = WorldClimUtil.getWorldclim25Tile(longitude, latitude);
Integer tileIndex3deg = TileIndexCalculator.get3MinuteTileIndex(longitude, latitude);
if (tileIndex != null) {
aid.setTileIndex(tileIndex);
tileIndexSet.add(tileIndex);
} else if (aid.getTileIndex() != null) {
aid.setTileIndex(null);
}
if (tileIndex3deg != null) {
aid.setTileIndex3min(tileIndex3deg);
} else if (aid.getTileIndex3min() != null) {
aid.setTileIndex3min(null);
}
toSave.add(aid);
}
if (toSave.size() > 0) {
LOG.info("Updating {} accessions' tileIndex at position {} of {}", toSave.size(), startIndex, accessionIds.size());
climateDataService.updateAccessionTileIndex(toSave);
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
});
}
return tileIndexSet;
}
private Set<Long> getExistingTileIndexes() {
final Set<Long> tileIndexSet = new HashSet<Long>(20000);
tileIndexSet.addAll(accessionIdRepository.getTileIndexes());
return tileIndexSet;
}
private String getFileName(String variableName) throws FileNotFoundException {
if ("alt".equalsIgnoreCase(variableName)) {
return "alt_2-5m_bil.zip";
} else if (variableName.matches("prec\\d{1,2}")) {
return "prec_2-5m_bil.zip";
} else if (variableName.matches("tmax\\d{1,2}")) {
return "tmax_2-5m_bil.zip";
} else if (variableName.matches("tmin\\d{1,2}")) {
return "tmin_2-5m_bil.zip";
} else if (variableName.matches("tmean\\d{1,2}")) {
return "tmean_2-5m_bil.zip";
} else if (variableName.matches("bio\\d{1,2}")) {
return "bio_2-5m_bil.zip";
} else
throw new FileNotFoundException("No worldclim file for " + variableName);
}
private boolean haveData(String variableName) {
GenericGridFile ggf = new GenericGridFile(worldClimDir, variableName);
return ggf.getHeaderFile().exists() && ggf.getDataFile().exists();
}
/**
* Download WorldClim file to a temporary file
*
* @param fileName
* @return resulting file
* @throws IOException
*/
public File download(String fileName) throws IOException {
File tempFile = File.createTempFile("worldclim", ".zip");
try (CloseableHttpClient httpclient = HttpClientBuilder.create().build()) {
HttpResponse response = httpclient.execute(new HttpGet(WORLDCLIM_ORG_FILEBASE + fileName));
// Get hold of the response entity
final HttpEntity entity = response.getEntity();
LOG.debug("{} {}", entity.getContentType(), entity.getContentLength());
try (InputStream istream = new BufferedInputStream(entity.getContent()); OutputStream ostream = new BufferedOutputStream(new FileOutputStream(tempFile))) {
LOG.info("Downloading {} {}", fileName, entity.getContentLength());
IOUtils.copy(istream, ostream);
ostream.flush();
LOG.info("Retrieved {}", fileName);
}
} catch (final ClientProtocolException e) {
LOG.error(e.getMessage(), e);
tempFile.delete();
throw new IOException(e);
}
return tempFile;
}
/**
* Download WorldClim file and extract to destination folder.
*
* @param destination
* @param fileName
* @return
* @throws IOException
*/
public List<File> downloadAndExtract(File destination, String fileName) throws IOException {
File tempFile = download(fileName);
try {
List<File> files = GenericGridZipFile.unzip(tempFile, destination);
for (File file : files) {
LOG.info("Extracted {}", file.getCanonicalPath());
}
return files;
} finally {
tempFile.delete();
}
}
/**
* Update only tileIndexes in use by {@link AccessionId} that don't have a
* record in {@link TileClimate}.
*
* Executed 5hrs after startup, then 4hrs between each run.
*/
@Scheduled(fixedDelayString = "PT4H", initialDelayString = "PT5H")
@SchedulerLock(name = "org.genesys.server.service.worker.WorldClimUpdater")
protected void autoUpdate() {
LOG.warn("Executing scheduled run of WorldClim auto-update for tiles without data.");
if (missingTileIndexes()) {
try {
generateTileIndexes();
} catch (IOException e) {
LOG.error("Error generating missing tileIndexes: {}", e.getMessage());
throw new RuntimeException("Error generating missing tileIndexes", e);
}
}
// Select tile-indexes to load
var tileIndexesInUse = jpaQueryFactory.from(QAccessionId.accessionId)
// where we have tile index that is not in TileClimate
.where(QAccessionId.accessionId.tileIndex.notIn(JPAExpressions.selectFrom(QTileClimate.tileClimate).select(QTileClimate.tileClimate.tileIndex).distinct()))
// need distinct tile index
.select(QAccessionId.accessionId.tileIndex).distinct()
// list
.fetch();
updateSelectedTileIndexes(new HashSet<>(tileIndexesInUse));
}
/**
* Update all WorldClim variables.
*/
public void update() {
if (missingTileIndexes()) {
try {
generateTileIndexes();
} catch (IOException e) {
LOG.error("Error generating missing tileIndexes: {}", e.getMessage());
throw new RuntimeException("Error generating missing tileIndexes", e);
}
}
updateSelectedTileIndexes(null);
}
private void updateSelectedTileIndexes(Set<Long> tileIndexesToUpdate) {
if (tileIndexesToUpdate != null && tileIndexesToUpdate.size() == 0) {
// Don't bother
return;
}
var startDate = Instant.now(); // Use this to find tileIndexes modified in this run.
taskExecutor.execute(() -> {
LOG.warn("WorldClim variable update started at {} for {} tile indexes", Instant.now(), tileIndexesToUpdate == null ? "ALL" : tileIndexesToUpdate.size());
try {
update("alt", tileIndexesToUpdate);
} catch (IOException e) {
LOG.error("Error updating worldClim data for variable alt", e);
}
for (int i = 1; i <= 12; i++) {
final int variableIndex = i;
try {
update("tmin" + variableIndex, tileIndexesToUpdate);
} catch (IOException e) {
LOG.error("Error updating worldClim data for variable tmin" + variableIndex, e);
}
try {
update("tmax" + variableIndex, tileIndexesToUpdate);
} catch (IOException e) {
LOG.error("Error updating worldClim data for variable tmax" + variableIndex, e);
}
try {
update("tmean" + variableIndex, tileIndexesToUpdate);
} catch (IOException e) {
LOG.error("Error updating worldClim data for variable tmean" + variableIndex, e);
}
try {
update("prec" + variableIndex, tileIndexesToUpdate);
} catch (IOException e) {
LOG.error("Error updating worldClim data for variable prec" + variableIndex, e);
}
}
for (int i = 1; i <= 19; i++) {
final int variableIndex = i;
try {
update("bio" + variableIndex, tileIndexesToUpdate);
} catch (IOException e) {
LOG.error("Error updating worldClim data for variable bio" + variableIndex, e);
}
}
// Schedule indexing of accessions with tileInices updated since startDate
// @Query("select distinct tileIndex from TileClimate where modifiedDate >= ?1")
var updatedTileIndexes = new HashSet<>(jpaQueryFactory
// select distinct tile index
.select(QTileClimate.tileClimate.tileIndex).distinct()
// from TileClimate
.from(QTileClimate.tileClimate)
// where modifiedDate > startDate
.where(QTileClimate.tileClimate.modifiedDate.goe(startDate))
// get
.fetch());
LOG.warn("TileClimate for {} records was updated in this run (since {}).", updatedTileIndexes.size(), startDate);
if (updatedTileIndexes.size() > 0) {
if (updatedTileIndexes.size() > 5000) {
LOG.warn("A full reindex of Accessions is in order, we have {} TileClimates with updates.", updatedTileIndexes.size());
} else {
try {
var accessionFilter = new AccessionFilter();
accessionFilter.geo().tileIndex(updatedTileIndexes);
LOG.warn("Triggering reindexing of accessions using {} TileClimates", updatedTileIndexes.size());
if (elasticsearchService != null) {
elasticsearchService.reindex(Accession.class, accessionFilter);
}
} catch (Throwable e) {
LOG.error("Error executing reindex Accession", e);
}
}
}
LOG.warn("WorldClim update ended at {}", Instant.now());
});
}
}