ElasticReindexProcessor.java

/*
 * Copyright 2018 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.genesys.server.component.elastic;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import javax.annotation.Resource;

import org.genesys.server.service.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * ES Processor component uses Spring's @Scheduled annotation to scan queues
 * with 2000ms delay measured from the completion time of each preceding
 * invocation.
 */
@Component("elasticReindexProcessor")
public class ElasticReindexProcessor {
	
	/** The Constant LOG. */
	public static final Logger LOG = LoggerFactory.getLogger(ElasticReindexProcessor.class);

	private static final int BATCH_SIZE = 100;

	@Resource
	private BlockingQueue<ElasticReindex> elasticReindexQueue;
	
	@Autowired
	private ElasticsearchService elasticsearch;
	
	public ElasticReindexProcessor() {
		System.err.println("Making ElasticReindexProcessor");
	}
	
	/**
	 * Process queues.
	 */
	@Scheduled(fixedDelay = 1100)
	public void processQueues() {
		if (LOG.isTraceEnabled()) {
			LOG.trace("Scanning ES update queue size={}", elasticReindexQueue.size());
		}
		if (elasticReindexQueue.isEmpty()) {
			return;
		}

		List<ElasticReindex> forReindexing = new ArrayList<>(200);
		Map<Class<?>, Set<Long>> buckets = Collections.synchronizedMap(new LinkedHashMap<Class<?>, Set<Long>>());

		while (elasticReindexQueue.drainTo(forReindexing, 200) > 0) {
			LOG.debug("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
			forReindexing.stream().forEach(er -> bucketize(buckets, er));
			forReindexing.clear();
		}

		if (!buckets.isEmpty()) {
			for (Map.Entry<Class<?>, Set<Long>> entry : buckets.entrySet()) {
				Set<Long> bucket = entry.getValue();
				elasticsearch.asyncUpdate(entry.getKey(), bucket);
				bucket.clear();
			}
			buckets.clear();
		}
	}

	private void bucketize(final Map<Class<?>, Set<Long>> buckets, final ElasticReindex toReindex) {
		if (toReindex == null)
			return;

		Class<?> clazz = toReindex.getClazz();
		Set<Long> bucket = buckets.computeIfAbsent(clazz, k -> Collections.synchronizedSet(new LinkedHashSet<Long>()));
		bucket.add(toReindex.getId());

		if (bucket.size() >= BATCH_SIZE) {
			elasticsearch.asyncUpdate(clazz, bucket);
			bucket.clear();
		}
	}

}