AccessionCounter.java

/*
 * Copyright 2018 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.genesys.server.service.worker;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import org.genesys.server.model.impl.FaoInstitute;
import org.genesys.server.service.GenesysService;
import org.genesys.server.service.InstituteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * Updating {@link FaoInstitute} accession count during upsert and delete is not
 * efficient. This component keeps requests for updates in a delayed queue,
 * keeping the most recent request for update.
 * 
 * It processes expired recounts every X seconds.
 * 
 * @author Matija Obreza
 */
@Component
public class AccessionCounter implements InitializingBean, DisposableBean {

	private static final Logger LOG = LoggerFactory.getLogger(AccessionCounter.class);

	// Interval time (in ms) for scanning the queue for expired updates
	private static final int DELAY_BETWEEN_QUEUESCANS = 5000; // 5 sec
	// If a request comes before this delay (in ms), reschedule
	private static final long MINTIME_BETWEEN_UPDATES = 60 * 1000; // 1 min

	private DelayQueue<DelayedObject<String>> instituteQueue;

	@Autowired(required = false)
	private InstituteService instituteService;

	@Autowired(required = false)
	private GenesysService genesysService;

	@Override
	public void afterPropertiesSet() throws Exception {
		this.instituteQueue = new DelayQueue<>();
	}

	@Override
	public void destroy() throws Exception {
		LOG.info("Running {} re-counts before shutdown", this.instituteQueue.size());
		synchronized (instituteQueue) {
			for (DelayedObject<String> forProcessing : this.instituteQueue) {
				process(forProcessing);
			}
		}
	}

	public void recountInstitute(FaoInstitute institute) {
		if (institute == null) {
			return;
		}

		DelayedObject<String> delay = new DelayedObject<String>(institute.getCode(), MINTIME_BETWEEN_UPDATES);
		synchronized (instituteQueue) {
			LOG.trace("Queue size {}, contains {} = {}", instituteQueue.size(), delay.getObj(), instituteQueue.contains(delay));
			if (!instituteQueue.remove(delay)) {
				LOG.trace("Element was not removed {}", delay.getObj());
			}
			instituteQueue.put(delay);
		}
		LOG.trace("Rescheduled count for {}", institute.getCode());
	}

	@Scheduled(fixedDelay = DELAY_BETWEEN_QUEUESCANS)
	public void processQueues() {
		DelayedObject<String> forProcessing = null;
		do {
			synchronized (instituteQueue) {
				forProcessing = instituteQueue.poll();
			}
			process(forProcessing);
		} while (forProcessing != null);
	}

	private void process(DelayedObject<String> forProcessing) {
		if (forProcessing == null) {
			return;
		}

		if (instituteService == null || genesysService == null) {
			LOG.warn("Cannot update count for {}, no services.", forProcessing.getObj());
			return;
		}

		FaoInstitute institute = instituteService.findInstitute(forProcessing.getObj());

		if (institute != null) {
			LOG.info("Updating count for {}", institute.getCode());
			genesysService.updateAccessionCount(institute);
			
			LOG.info("Updating PDCI for {}", institute.getCode());
			genesysService.updatePDCI(institute);
		}
	}

	public static class DelayedObject<T> implements Delayed {

		private T obj;
		private long expiryTime;

		/**
		 * 
		 * @param delay delay in milliseconds
		 */
		DelayedObject(T obj, long delay) {
			this.obj = obj;
			this.expiryTime = System.currentTimeMillis() + delay;
		}

		public T getObj() {
			return this.obj;
		}

		@Override
		public int compareTo(Delayed other) {
			if (this == other) {
				return 0;
			}
			return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
		}

		@Override
		public long getDelay(TimeUnit unit) {
			long delay = unit.convert(expiryTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
			LOG.trace("{} scheduled in {}{}", this.obj, delay, unit.name());
			return delay;
		}

		@Override
		public int hashCode() {
			final int prime = 31;
			int result = 1;
			result = prime * result + ((obj == null) ? 0 : obj.hashCode());
			return result;
		}

		@Override
		public boolean equals(Object obj) {
			if (this == obj)
				return true;
			if (obj == null)
				return false;
			if (getClass() != obj.getClass())
				return false;
			DelayedObject<?> other = (DelayedObject<?>) obj;
			if (this.obj == null) {
				if (other.obj != null)
					return false;
			} else if (!this.obj.equals(other.obj))
				return false;
			return true;
		}
	}

}