AccessionCounter.java
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.service.worker;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.genesys.server.model.impl.FaoInstitute;
import org.genesys.server.service.GenesysService;
import org.genesys.server.service.InstituteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* Updating {@link FaoInstitute} accession count during upsert and delete is not
* efficient. This component keeps requests for updates in a delayed queue,
* keeping the most recent request for update.
*
* It processes expired recounts every X seconds.
*
* @author Matija Obreza
*/
@Component
public class AccessionCounter implements InitializingBean, DisposableBean {
private static final Logger LOG = LoggerFactory.getLogger(AccessionCounter.class);
// Interval time (in ms) for scanning the queue for expired updates
private static final int DELAY_BETWEEN_QUEUESCANS = 5000; // 5 sec
// If a request comes before this delay (in ms), reschedule
private static final long MINTIME_BETWEEN_UPDATES = 60 * 1000; // 1 min
private DelayQueue<DelayedObject<String>> instituteQueue;
@Autowired(required = false)
private InstituteService instituteService;
@Autowired(required = false)
private GenesysService genesysService;
@Override
public void afterPropertiesSet() throws Exception {
this.instituteQueue = new DelayQueue<>();
}
@Override
public void destroy() throws Exception {
LOG.info("Running {} re-counts before shutdown", this.instituteQueue.size());
synchronized (instituteQueue) {
for (DelayedObject<String> forProcessing : this.instituteQueue) {
process(forProcessing);
}
}
}
public void recountInstitute(FaoInstitute institute) {
if (institute == null) {
return;
}
DelayedObject<String> delay = new DelayedObject<String>(institute.getCode(), MINTIME_BETWEEN_UPDATES);
synchronized (instituteQueue) {
LOG.trace("Queue size {}, contains {} = {}", instituteQueue.size(), delay.getObj(), instituteQueue.contains(delay));
if (!instituteQueue.remove(delay)) {
LOG.trace("Element was not removed {}", delay.getObj());
}
instituteQueue.put(delay);
}
LOG.trace("Rescheduled count for {}", institute.getCode());
}
@Scheduled(fixedDelay = DELAY_BETWEEN_QUEUESCANS)
public void processQueues() {
DelayedObject<String> forProcessing = null;
do {
synchronized (instituteQueue) {
forProcessing = instituteQueue.poll();
}
process(forProcessing);
} while (forProcessing != null);
}
private void process(DelayedObject<String> forProcessing) {
if (forProcessing == null) {
return;
}
if (instituteService == null || genesysService == null) {
LOG.warn("Cannot update count for {}, no services.", forProcessing.getObj());
return;
}
FaoInstitute institute = instituteService.findInstitute(forProcessing.getObj());
if (institute != null) {
LOG.info("Updating count for {}", institute.getCode());
genesysService.updateAccessionCount(institute);
LOG.info("Updating PDCI for {}", institute.getCode());
genesysService.updatePDCI(institute);
}
}
public static class DelayedObject<T> implements Delayed {
private T obj;
private long expiryTime;
/**
*
* @param delay delay in milliseconds
*/
DelayedObject(T obj, long delay) {
this.obj = obj;
this.expiryTime = System.currentTimeMillis() + delay;
}
public T getObj() {
return this.obj;
}
@Override
public int compareTo(Delayed other) {
if (this == other) {
return 0;
}
return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
long delay = unit.convert(expiryTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
LOG.trace("{} scheduled in {}{}", this.obj, delay, unit.name());
return delay;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((obj == null) ? 0 : obj.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
DelayedObject<?> other = (DelayedObject<?>) obj;
if (this.obj == null) {
if (other.obj != null)
return false;
} else if (!this.obj.equals(other.obj))
return false;
return true;
}
}
}