ElasticReindexProcessor.java
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.component.elastic;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.genesys.server.service.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* ES Processor component uses Spring's @Scheduled annotation to scan queues
* with 2000ms delay measured from the completion time of each preceding
* invocation.
*/
@Component("elasticReindexProcessor")
public class ElasticReindexProcessor {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(ElasticReindexProcessor.class);
private static final int BATCH_SIZE = 100;
@Resource
private BlockingQueue<ElasticReindex> elasticReindexQueue;
@Autowired
private ElasticsearchService elasticsearch;
public ElasticReindexProcessor() {
System.err.println("Making ElasticReindexProcessor");
}
/**
* Process queues.
*/
@Scheduled(fixedDelay = 1100)
public void processQueues() {
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning ES update queue size={}", elasticReindexQueue.size());
}
if (elasticReindexQueue.isEmpty()) {
return;
}
List<ElasticReindex> forReindexing = new ArrayList<>(200);
Map<Class<?>, Set<Long>> buckets = Collections.synchronizedMap(new HashMap<Class<?>, Set<Long>>());
while (elasticReindexQueue.drainTo(forReindexing, 200) > 0) {
LOG.debug("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
forReindexing.stream().forEach(er -> bucketize(buckets, er));
forReindexing.clear();
}
if (!buckets.isEmpty()) {
for (Map.Entry<Class<?>, Set<Long>> entry : buckets.entrySet()) {
Set<Long> bucket = entry.getValue();
elasticsearch.asyncUpdate(entry.getKey(), bucket);
bucket.clear();
}
buckets.clear();
}
}
private void bucketize(final Map<Class<?>, Set<Long>> buckets, final ElasticReindex toReindex) {
if (toReindex == null)
return;
Class<?> clazz = toReindex.getClazz();
Set<Long> bucket = buckets.get(clazz);
if (bucket == null) {
buckets.put(clazz, bucket = Collections.synchronizedSet(new HashSet<Long>()));
}
bucket.add(toReindex.getId());
if (bucket.size() >= BATCH_SIZE) {
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
}
}
}