ScheduledGLISUpdater.java
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.service.worker;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import javax.persistence.EntityManager;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.glis.v1.TermConstants;
import org.genesys.glis.v1.api.GenesysApi;
import org.genesys.glis.v1.model.UpdateTargets;
import org.genesys.glis.v1.model.UpdatedTarget;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.QAccession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.jpa.JPQLQuery;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
/**
* Updating GLIS with new accessions with DOI that were modified in the last 8 hours.
*
* Task runs on 1 cluster node every 6 hours.
*
* @author Matija Obreza
* @author Maxym Borodenko
*/
@Component
public class ScheduledGLISUpdater {
// Handle data in batches of 100
private static final int BATCH_SIZE = 100;
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(ScheduledGLISUpdater.class);
/** The GLIS Genesys API. */
@Autowired(required = false)
private GenesysApi glisGenesys;
/** The entity manager. */
@Autowired
private EntityManager em;
@Scheduled(initialDelay = 1, fixedDelay = 3, timeUnit = java.util.concurrent.TimeUnit.HOURS) // Every three hours
@SchedulerLock(name = "org.genesys.server.service.worker.ScheduledGLISUpdater", lockAtLeastFor = "PT1H", lockAtMostFor = "PT1H15M") // Keep locked for 1 hour
public void runGLISUpdater() {
if (glisGenesys == null) {
LOG.warn("GLIS Client not available. Not initializing GLIS Updater.");
return;
}
Instant inst = Instant.now().minus(4, ChronoUnit.HOURS);
LOG.warn("Starting scheduled GLIS update with updates since {}", inst);
StopWatch stopWatch = StopWatch.createStarted();
notifyGLIS(inst);
stopWatch.stop();
LOG.warn("Scheduled GLIS update took {}ms", stopWatch.getTime());
}
/**
* Notify GLIS.
*
*/
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void notifyGLIS(Instant date) {
LOG.info("Starting GLIS update with updates since {}", date);
PathBuilder<Accession> builder = new PathBuilderFactory().create(Accession.class);
Querydsl querydsl = new Querydsl(em, builder);
JPQLQuery<String> query = querydsl.createQuery(QAccession.accession)
// select DOI only
.select(QAccession.accession.doi)
.where(QAccession.accession.lastChangedDate.after(date).and(QAccession.accession.doi.isNotNull()))
// order by id
.orderBy(QAccession.accession.id.asc())
.limit(BATCH_SIZE);
int startPosition = 0;
List<String> results;
StopWatch stopWatch = StopWatch.createStarted();
do {
query.offset(startPosition);
results = query.fetch();
// Update GLIS using loaded DOIs
if (!results.isEmpty()) {
processBatch(results);
}
// Start position is updated above.
startPosition += results.size();
// Clear anything cached in the entity manager
em.clear();
} while (results.size() > 0);
stopWatch.stop();
LOG.warn("Scheduled GLIS update took {}ms", stopWatch.getTime());
}
private void processBatch(List<String> accessionDOIs) {
if (CollectionUtils.isEmpty(accessionDOIs)) {
return;
}
UpdateTargets targets = new UpdateTargets();
targets.addKwsItem(TermConstants.PASSPORT_DATA);
targets.setDois(accessionDOIs);
for (int i = 5; i >= 0; i--) {
try {
LOG.info("Updating GLIS for {} accessions with DOIs", targets.getDois().size());
List<UpdatedTarget> glisResponse = glisGenesys.registerGenesysAsTarget(targets);
glisResponse.forEach(updatedTarget -> {
if (StringUtils.equals("OK", updatedTarget.getResult())) {
LOG.trace("GLIS for {} result={}", updatedTarget.getDoi(), updatedTarget.getResult());
} else if (StringUtils.equals("KO", updatedTarget.getResult())) {
LOG.info("GLIS for {} result={} msg={}", updatedTarget.getDoi(), updatedTarget.getResult());
}
});
break;
} catch (Throwable e) {
if (i == 0) {
LOG.error("Error updating GLIS targets: {}", e.getMessage(), e);
} else {
LOG.warn("Problem updating GLIS targets with DOIs: {}. {}. Retrying.", targets.getDois(), e.getMessage());
}
}
}
}
}