ElasticsearchServiceImpl.java

/*
 * Copyright 2022 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.genesys.server.service.impl;

import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.multiMatchQuery;
import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.iterators.ReverseListIterator;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SimpleQueryStringBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.model.Publishable;
import org.genesys.blocks.model.VersionedModel;
import org.genesys.blocks.model.filters.EmptyModelFilter;
import org.genesys.blocks.security.SecurityContextUtil;
import org.genesys.blocks.security.model.AclAwareModel;
import org.genesys.custom.elasticsearch.CustomMapping;
import org.genesys.server.api.model.Api1Constants;
import org.genesys.server.component.elastic.ElasticLoader;
import org.genesys.server.component.elastic.ElasticQueryBuilder;
import org.genesys.server.component.elastic.ElasticReindex;
import org.genesys.server.exception.SearchException;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.service.AccessionService.IBatchAction;
import org.genesys.server.service.ElasticsearchService;
import org.genesys.server.service.filter.AccessionFilter;
import org.genesys.server.service.filter.IFullTextFilter;
import org.genesys.spring.config.ElasticsearchConfig.GenesysEntityMapper;
import org.genesys.spring.config.HazelcastConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.data.elasticsearch.core.query.AliasQuery;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilter;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.elasticsearch.core.query.SourceFilter;
import org.springframework.data.geo.Point;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.data.querydsl.SimpleEntityPathResolver;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.acls.domain.BasePermission;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import com.fasterxml.jackson.module.jsonSchema.types.ObjectSchema;
import com.google.common.collect.Sets;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.EntityPath;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.core.types.dsl.SetPath;
import com.querydsl.jpa.JPQLQuery;

import lombok.NonNull;

/**
 * Manage Elasticsearch indexing, indices.
 *
 * @author Matija Obreza
 */
// Not @Service
public class ElasticsearchServiceImpl implements ElasticsearchService, InitializingBean {

	private static final String ACCESSION_GEO_LONGITUDE = Api1Constants.Accession.LONGITUDE;

	private static final String ACCESSION_GEO_LATITUDE = Api1Constants.Accession.LATITUDE;

	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchServiceImpl.class);

	private static final String INDEX_READ = "_read";
	private static final String INDEX_WRITE = "_write";
	private static final String ALIAS_EVERYTHING = "Everything";
	private static final String COMMON_TYPE_NAME = "data";

	@Value("${elasticsearch.cluster.nodes}")
	private String esClusterNodes;

	private static final ObjectMapper objectMapper = new ObjectMapper();

	/** The Constant DEFAULT_SOURCE_FILTER. */
	protected static final SourceFilter DEFAULT_SOURCE_FILTER = new FetchSourceFilter(new String[] { "id", "_class", "title", "code", "description" }, new String[] {});

	protected static final Set<String> RESERVED_CHARACTERS = Set.of("/", "+", "-", "=", "&&", "||", "!", "(", ")", "{", "}", "[", "]", "^", "~", "*", "?", ":");

	@Resource
	private BlockingQueue<ElasticReindex> elasticReindexQueue;

	@Resource
	@Qualifier("elasticsearchReindexLock")
	private Lock elasticsearchReindexLock;

	@Autowired
	private TaskExecutor taskExecutor;

	@Autowired
	private EntityManager em;

	@Autowired
	private ElasticsearchOperations elasticsearchRestTemplate;

	@Autowired
	private RestHighLevelClient client;

	@Autowired
	private EntityMapper mapper;

	@Autowired
	private ElasticsearchService _self;

	private final Set<Class<? extends EmptyModel>> indexedEntities = Collections.synchronizedSet(new HashSet<>());
	private final Map<String, Class<EmptyModel>> namesToClasses = Collections.synchronizedMap(new HashMap<>());
	private final Map<Class<? extends EmptyModel>, Set<String>> jsonSchemas = new HashMap<>();

	@Resource
	@Qualifier("clusterFlags")
	private Map<String, Boolean> clusterFlags;

	@Override
	public void afterPropertiesSet() throws Exception {
		if (withElasticsearch()) {
			for (Class<? extends EmptyModel> clazz : indexedEntities) {
				final String writeIndex = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);
				if (writeIndex == null) {
					ensureWriteAlias(clazz);
				} else {
					LOG.info("Updating write index {} for {}", writeIndex, clazz.getName());
					Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
					try {
						elasticsearchRestTemplate.putMapping(writeIndex, COMMON_TYPE_NAME, mapping);
					} catch (Throwable e) {
						LOG.error("The ES mapping is no longer compatible for index={} of {}. Please regenerate.", writeIndex, clazz);
					}
				}
			}
		} else {
			LOG.warn("Elasticsearch not accessible, not updating mappings");
		}

		JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(((GenesysEntityMapper) mapper).getObjectMapper());
		for (Class<? extends EmptyModel> clazz: indexedEntities) {
			try {
				JsonSchema schema = schemaGen.generateSchema(clazz);
				jsonSchemas.put(clazz, buildJsonPaths(((ObjectSchema) schema).getProperties(), null));
			} catch (Throwable e) {
				LOG.error("The list of all {} fields is not created.", clazz.getSimpleName(), e);
			}
		}
	}
	
	@Override
	public long waitForCount(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filter, int mustHaveCount) throws SearchException {
		long count = 0;
		int repeats = 0;
		do {
			try {
				var readIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_READ);
				var writeIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);

				if (! Objects.equals(readIndexName, writeIndexName)) {
					LOG.warn("Read/write index mismatch {}!={}.", readIndexName, writeIndexName);
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
					}
					throw new ElasticsearchStatusException("Read/write index mismatch,", null); // This is caught later and will reset counts.
				}

				count = count(clazz, filter);
				if (count != mustHaveCount) {
					LOG.debug("ES count of {} is {}!={}", clazz.getName(), count, mustHaveCount);
					if (repeats++ > 10) {
						throw new RuntimeException("ES count did not settle in 10 retries.");
					}
					try {
						var sleepTime = (repeats * 200L) + RandomUtils.nextLong((repeats) * 50L, (repeats + 1) * 100L);
						LOG.warn("ES count #{} for {} is {}!={}, sleeping for {}ms...", repeats, clazz.getName(), count, mustHaveCount, sleepTime);
						Thread.sleep(sleepTime);
					} catch (InterruptedException e) {
						return count;
					}
				}
			} catch (ElasticsearchStatusException e) {
				count = -1;
				repeats = 0;
				LOG.warn("Error while waiting for count: {}. Retrying...", e.getMessage());
			}
		} while (count != mustHaveCount);
		return count;
	}

	/**
	 * Makes a list of all JSON paths for indexed entity and all related types.
	 *
	 * @param properties properties
	 * @param parentPath parentPath
	 */
	private Set<String> buildJsonPaths(Map<String, JsonSchema> properties, String parentPath) {
		if (MapUtils.isEmpty(properties)) {
			return Collections.emptySet();
		}

		Set<String> fieldList = new HashSet<>();
		Set<Map.Entry<String, JsonSchema>> entries = properties.entrySet();
		entries.removeIf(e -> e.getKey().equals("_class") || e.getKey().equals("_permissions"));

		for (var entry : entries) {
			JsonSchema schema = entry.getValue();
			String fullPath = StringUtils.isBlank(parentPath) ? entry.getKey() : parentPath + "." + entry.getKey();

			if (schema instanceof ObjectSchema) {
				fieldList.addAll(buildJsonPaths(((ObjectSchema)schema).getProperties(), fullPath));
			} else {
				fieldList.add(fullPath);
			}
		}
		return fieldList;
	}

	private boolean withElasticsearch() {
		try {
			return RestStatus.OK == client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT).status();
		} catch (Throwable e) {
			LOG.warn(e.getMessage());
			return false;
		}
	}

	@Override
	public void reindexAll() {
		for (Class<? extends EmptyModel> clazz : indexedEntities) {
			if (Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value)) || Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX.value))) {
				break;
			}
			reindex(clazz);
		}
		LOG.warn("Clearing STOP_REINDEX flags");
		clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, false);
		clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value, false);
	}

	@Override
	public void stopReindex() {
		clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, true);
	}

	@Override
	public void stopReindexAll() {
		LOG.warn("Setting STOP_REINDEX flags");
		clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, true);
		clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value, true);
	}

	@Override
	public void allowReindexAll() {
		LOG.warn("Clearing STOP_REINDEX flags");
		clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, false);
		clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value, false);
	}

	@Override
	public TermResult recountResult(Class<? extends EmptyModel> clazz, SetPath<?, ?> setPath, EmptyModelFilter<?, ?> filter, TermResult toRecount, String termName) throws ExecutionException, InterruptedException, SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ;
		
		QueryBuilder q1 = toEsQuery(clazz, filter, setPath.size().gt(0)); // non-empty []
		QueryBuilder q2 = toEsQuery(clazz, filter, setPath.size().eq(0)); // empty []

		ValueCountAggregationBuilder aggregation = AggregationBuilders.count(termName).field(termName);

		SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(clazz, q1); 
		sourceBuilder.size(0).aggregation(aggregation);
		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
		ValueCount termAgg = search(searchRequest, RequestOptions.DEFAULT).getAggregations().get(termName);
		long setCount = termAgg.getValue();
		
		sourceBuilder = makeSearchSourceBuilder(clazz, q2);
		sourceBuilder.size(0).aggregation(aggregation);
		SearchRequest entityEsQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
		long entityCount = search(entityEsQuery, RequestOptions.DEFAULT).getHits().getTotalHits();

		return new TermResult(termName, setCount + entityCount, toRecount.getTerms(), toRecount.getOther());
	}

	@SuppressWarnings("unchecked")
	@Override
	public <R extends EmptyModel> void indexEntity(Class<R> clazz) {

		synchronized (this) {
			if (indexedEntities.contains(clazz)) {
				LOG.warn("Entity {} already set for indexing.", clazz.getName());
				return;
			}
		}

		indexedEntities.add(clazz);
		namesToClasses.put(clazz.getSimpleName(), (Class<EmptyModel>) clazz);
	}

	@Override
	public List<Class<?>> getIndexedEntities() {
		ArrayList<Class<?>> entities = new ArrayList<>(this.indexedEntities);
		entities.sort(Comparator.comparing(Class::getName));
		return ListUtils.unmodifiableList(entities);
	}

	@Override
	public <R> void removeAll(Class<R> clazz) throws SearchException {
		if (!indexedEntities.contains(clazz)) {
			LOG.debug("Class {} is not indexed.", clazz.getName());
			return;
		}

		if (!withElasticsearch()) {
			LOG.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
			return;
		}

		LOG.info("Deleting all docs for {}", clazz.getName());
		DeleteQuery dq = new DeleteQuery();
		dq.setIndex(toIndexName(clazz) + INDEX_WRITE);
		dq.setType(COMMON_TYPE_NAME);
		dq.setQuery(new MatchAllQueryBuilder());
		elasticsearchRestTemplate.delete(dq);
	}

	@Override
	public <R> void reindex(Class<R> clazz) {
		boolean isLockAcquired = elasticsearchReindexLock.tryLock();
		if (isLockAcquired) {
			try {
				internalReindex(clazz);
			} finally {
				elasticsearchReindexLock.unlock();
			}
		} else {
			throw new RuntimeException("Could not run reindex for " + clazz.getSimpleName() + ". Operation locked.");
		}
	}

	private <R> void internalReindex(Class<R> clazz) {
		if (!indexedEntities.contains(clazz)) {
			LOG.warn("Class {} is not indexed.", clazz.getName());
			return;
		}

		if (!withElasticsearch()) {
			LOG.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
			return;
		}

		LOG.info("Reindexing {}", clazz.getName());
		final String indexRoot = toIndexName(clazz);

		// Figure out existing index name
		String currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_READ);
		if (currentIndexName == null) {
			currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_WRITE);
		}

		// Make new index with timestamp
		final String indexName = createIndexFor(clazz);
		// The old index name
		final String oldIndexName = currentIndexName;

		// Scan
		scanDatabase(clazz, null);

		// Schedule rename
		taskExecutor.execute(() -> {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// Fine...
			}
			try {
				do {
					var queueSize = elasticReindexQueue.size();
					if (queueSize == 0) {
						break;
					}
					LOG.trace("ES Reindex queue has {} elements, waiting to realias {}...", queueSize, indexName);
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// Fine...
					}
				} while (true);
	
				LOG.info("ES Reindex queue is empty, realiasing {}", indexName);
				// Move _READ alias
				realias(indexRoot + INDEX_READ, indexName);
				// Add to EVERYTHING
				deleteAlias(ALIAS_EVERYTHING, indexRoot + "*");
				addAlias(ALIAS_EVERYTHING, indexName);
	
				// delete old index
				if (oldIndexName != null) {
					LOG.info("Deleting old index {}", oldIndexName);
					elasticsearchRestTemplate.deleteIndex(oldIndexName);
				}
			} catch (Throwable e) {
				LOG.warn("Realiasing after full scan threw an exception: {}", e.getMessage(), e);
			}
		});
	}

	@Override
	public <T extends EmptyModel> void reindex(Class<T> clazz, EmptyModelFilter<?, ?> filter) {
		scanDatabase(clazz, filter);
	}

	@Override
	public <T extends EmptyModel> void remove(Class<T> clazz, EmptyModelFilter<?, T> filter) throws SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ; // Remove from READ index
		long esCount = count(clazz, filter);

		LOG.warn("ES removing {} documents of {}", esCount, clazz.toString());
		DeleteQuery dq = new DeleteQuery();
		dq.setIndex(indexName);
		dq.setType(COMMON_TYPE_NAME);
		dq.setQuery(toEsQuery(clazz, filter));
		elasticsearchRestTemplate.delete(dq);
	}

	/**
	 * Creates the index for the entity. It assigns it the _WRITE alias
	 *
	 * @param clazz the clazz
	 * @return the new index name
	 */
	private String createIndexFor(Class<?> clazz) {
		final String indexRoot = toIndexName(clazz);
		final String indexName = indexRoot + System.currentTimeMillis();

		LOG.info("Creating new index {} for {}", indexName, clazz.getName());


		/*@formatter:off*/
		MapBuilder<String, Object> settingsBuilder = new MapBuilder<String, Object>()
//			.put("index.blocks.read_only_allow_delete", null)
//			.put("index.mapping.total_fields.limit", 1000)
//			.put("index.number_of_shards", 1)
		;
		/*@formatter:on*/

		if (! "embedded".equals(esClusterNodes)) {
//			/*@formatter:off*/
//			settingsBuilder
//				// Analyzer
//				.put("index.analysis.analyzer.default.tokenizer", "standard")
//				.put("index.analysis.analyzer.default.filter.0", "asciifolding2")
//				// Filter
//				.put("index.analysis.filter.asciifolding2.type", "asciifolding")
//				.put("index.analysis.filter.asciifolding2.preserve_original", true)
//			;
//			/*@formatter:on*/
		} else {
			// Embedded ES
			settingsBuilder
				.put("index.number_of_shards", 1)
				.put("index.number_of_replicas", 0);
		}

		elasticsearchRestTemplate.createIndex(indexName, settingsBuilder.map());
		Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
		elasticsearchRestTemplate.putMapping(indexName, COMMON_TYPE_NAME, mapping);

		// Add _WRITE alias
		realias(indexRoot + INDEX_WRITE, indexName);
		return indexName;
	}

	private <R> void scanDatabase(Class<R> clazz, EmptyModelFilter<?, ?> filter) {

		PathBuilder<R> builder = new PathBuilderFactory().create(clazz);
		Querydsl querydsl = new Querydsl(em, builder);
		EntityPath<R> entityPath = SimpleEntityPathResolver.INSTANCE.createPath(clazz);
		PathBuilder<R> pathBuilder = new PathBuilder<R>(clazz, entityPath.getMetadata().getName());
		JPQLQuery<Long> query = querydsl.createQuery(entityPath)
			// select id only
			.select(pathBuilder.getNumber("id", Long.class))
			// and order by id
			.orderBy(pathBuilder.getNumber("id", Long.class).asc());

		if (filter != null) {
			// apply filter
			query.where(filter.buildPredicate());
		}

		// Integer scanSize = reindexBatchSize.get(clazz);
		final int customBatchSize = 5000; // scanSize == null ? batchSize : scanSize.intValue();

		int startPosition = 0;
		query.offset(startPosition);
		query.limit(customBatchSize);

		StopWatch stopWatch = new StopWatch();
		stopWatch.start();
		List<Long> results;
		long scanSleepMs = 0;
		var maxQueueSize = customBatchSize * 50;
		if (maxQueueSize > 20000) maxQueueSize = 20000;
		do {
			var queueSize = elasticReindexQueue.size();
			if (queueSize > maxQueueSize) {
				if (scanSleepMs < 1) {
					scanSleepMs = 5;
				}
				var sleeper = -1;
				do {
					sleeper++;
					LOG.info("Queue has {} elements. Delaying read by {}ms", queueSize, scanSleepMs);
					try {
						Thread.sleep(scanSleepMs);
					} catch (InterruptedException e) {
						LOG.warn("Delayed read interrupted!", e);
					}
				} while ((queueSize = elasticReindexQueue.size()) > maxQueueSize);
				scanSleepMs += sleeper * (scanSleepMs * 0.1);
			} else if (scanSleepMs > 0) {
				scanSleepMs -= (scanSleepMs * 0.1);
			}
			stopWatch.split();
			if (LOG.isDebugEnabled()) {
				LOG.debug("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
			}
			results = query.fetch();

			// Schedule indexing
			elasticReindexQueue.addAll(results.stream().map((res) -> new ElasticReindex(clazz, res)).collect(Collectors.toList()));

			// Next page
			query.offset(startPosition += results.size());
		} while (
				results.size() > 0 // We have data
				&& !Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX.value)) // Not stopped
		);

		var stopReindex = Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX.value));
		LOG.debug("Checking for stop reindex value. StopReindex = {}", stopReindex);
		if (stopReindex) {
			elasticReindexQueue.clear();
			clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, false);
		}
		stopWatch.stop();
		LOG.info("Scanning {} for reindex took {}ms", clazz.getName(), stopWatch.getTime());
	}

	/**
	 * Schedule a parallel update.
	 *
	 * @param clazz Entity to reindex
	 * @param entityIds Entity identifiers to reindex
	 */
	@Override
	public <R> void asyncUpdate(final Class<R> clazz, final Collection<Long> entityIds) {
		if (!indexedEntities.contains(clazz)) {
			LOG.warn("Class {} is not indexed.", clazz.getName());
			return;
		}

		if (entityIds.isEmpty()) {
			return;
		}

		final Set<Long> copy = new LinkedHashSet<>(entityIds);

		taskExecutor.execute(() -> {
			LOG.debug("Running scheduled reindex of {} size={}", clazz.getName(), copy.size());
			try {
				_self.update(clazz, copy);
			} catch (Throwable e) {
				LOG.error("Error updating {}: {}", clazz.getSimpleName(), e.getMessage(), e);
			}
		});
	}

	/**
	 * Will not modify the list of IDs
	 */
	@Override
	@Transactional(readOnly = true)
	public <R> void update(final Class<R> clazz, final Collection<Long> ids) {
		if (!indexedEntities.contains(clazz)) {
			LOG.warn("Class {} is not indexed.", clazz.getName());
			return;
		}

		if (ids.isEmpty()) return;

		Set<Long> notFoundIds = new LinkedHashSet<>(ids);

		final String indexName = toIndexName(clazz) + INDEX_WRITE;

		CriteriaBuilder cb = em.getCriteriaBuilder();
		CriteriaQuery<R> cq = cb.createQuery(clazz);
		Root<R> root = cq.from(clazz);
		cq.where(root.get("id").in(ids));

		TypedQuery<R> query = em.createQuery(cq);
		List<R> results = query.getResultList();

		List<IndexQuery> queries = new LinkedList<IndexQuery>();

		Map<String, String> jsons = new LinkedHashMap<>();
		var printedOne = false;
		for (R x : results) {
			LOG.trace("Indexing {} {}", clazz.getName(), x);

			EmptyModel bm = (EmptyModel) x;
			if (x instanceof ElasticLoader) {
				// Prepare entity for indexing (e.g. lazy-load)
				((ElasticLoader) x).prepareForIndexing();
			}
			try {
				var json = mapper.mapToString(bm);
				if (json.length() > 10000) {
					LOG.warn("Large indexing request {} id={} length={}", clazz, bm.getId(), json.length());
					if (!printedOne && LOG.isInfoEnabled()) {
						printedOne = true;
						LOG.info("One example of a large indexing request {} id={}:\n{}", clazz, bm.getId(), json);
					}
				}
				if (LOG.isTraceEnabled()) {
					LOG.trace("JSON: {}", json);
				}
				jsons.put(bm.getId().toString(), json);
			} catch (IOException e) {
				e.printStackTrace();
			}
			// is found
			notFoundIds.remove(bm.getId());

			IndexQuery iq = new IndexQuery();
			iq.setIndexName(indexName);
			iq.setType(COMMON_TYPE_NAME);
			iq.setId(String.valueOf(bm.getId()));
			iq.setObject(bm);

			queries.add(iq);
		}

		if (!queries.isEmpty()) {
			LOG.debug("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());
			try {
				elasticsearchRestTemplate.bulkIndex(queries);
			} catch (org.springframework.data.elasticsearch.ElasticsearchException e) {
				LOG.error(e.getMessage(), e);
				Map<String, String> failedDocs = e.getFailedDocuments();
				if (failedDocs != null) {
					for (var entry : failedDocs.entrySet()) {
						LOG.error("{} {}\n{}", entry.getKey(), entry.getValue(), jsons.get(entry.getKey()));
					}
				}
			} catch (ElasticsearchException e) {
				LOG.error("Could not index document", e);
			}
		}

//		for (R x : results) {
//			// detach from EM
//			em.detach(x);
//		}
		
		em.clear();
		queries.clear();

		for (Long id : notFoundIds) {
			LOG.trace("Removing {} id={} from index {}/{}", clazz.getName(), id, indexName, COMMON_TYPE_NAME);
			String res = elasticsearchRestTemplate.delete(indexName, COMMON_TYPE_NAME, String.valueOf(id));
			LOG.trace("Deleted ES document id={}", res);
		}
	}

	/**
	 * Checks that index "write" alias exists. When alias is not found, a new index
	 * is created and alias points to it.
	 *
	 * @param clazz for type mapping
	 */
	protected synchronized void ensureWriteAlias(Class<?> clazz) {
		String indexRoot = toIndexName(clazz);
		String aliasWrite = indexRoot + INDEX_WRITE;
		if (!aliasExists(aliasWrite)) {
			final String indexName = createIndexFor(clazz);
			String aliasRead = indexRoot + INDEX_READ;

			if (!aliasExists(aliasRead)) {
				deleteAlias(ALIAS_EVERYTHING, indexRoot + "*");
				addAlias(ALIAS_EVERYTHING, indexName);
				addAlias(aliasRead, indexName);
			}
		}
	}

	/**
	 * Get index name for clazz
	 *
	 * @param clazz
	 * @return
	 */
	private String toIndexName(Class<?> clazz) {
		return clazz.getSimpleName().toLowerCase();
	}

	private boolean aliasExists(final String aliasName) {
		try {
			return client.indices().existsAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
		} catch (IOException e) {
			return false;
		}
	}

	/**
	 * Make the alias point exclusively to the specified index
	 *
	 * @param aliasName The alias name
	 * @param indexName The index the alias points to
	 */
	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void realias(final String aliasName, final String indexName) {
		deleteAlias(aliasName);
		addAlias(aliasName, indexName);
	}

	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void addAlias(String aliasName, String indexName) {
		final AliasQuery query = new AliasQuery();
		query.setAliasName(aliasName);
		query.setIndexName(indexName);
		LOG.info("Adding alias {} to index {}", aliasName, indexName);
		this.elasticsearchRestTemplate.addAlias(query);
	}

	private String getFirstIndexForAlias(final String aliasName) {
		try {
			GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
			return response != null && !response.getAliases().isEmpty() ? (String) response.getAliases().keySet().toArray()[0] : null;
		} catch (Throwable e) {
			LOG.warn("Error while getting index for alias={}:", aliasName, e);
			return null;
		}
	}

	/**
	 * Delete alias.
	 *
	 * @param aliasName the alias name
	 */
	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void deleteAlias(final String aliasName) {
		try {
			GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);

			if (response != null && !response.getAliases().isEmpty()) {
				for (final String indexName: response.getAliases().keySet()) {
					deleteAlias(aliasName, indexName);
				}
			}
		} catch (IOException e) {
			LOG.warn("Error while deleting alias={}:", aliasName, e);
		}
	}

	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void deleteIndex(final String indexName) {
		elasticsearchRestTemplate.deleteIndex(indexName);
	}

	private void deleteAlias(String aliasName, String indexName) {
		final AliasQuery query = new AliasQuery();
		query.setAliasName(aliasName);
		query.setIndexName(indexName);

		try {
			this.elasticsearchRestTemplate.removeAlias(query);
			LOG.info("Removed alias {} from index {}", aliasName, indexName);
		} catch (ElasticsearchStatusException | AliasesNotFoundException e) {
			LOG.warn("Alias {} does not exist on index {}", aliasName, indexName);
		}
	}

	@Override
	@Transactional(readOnly = true)
	public <T extends EmptyModel> List<T> search(QueryBuilder shouldMatch, String searchQuery, Class<T> clazz) {
		if (!indexedEntities.contains(clazz)) {
			throw new RuntimeException("Class is not indexed " + clazz);
		}

		Pageable pageable = PageRequest.of(0, 20);

		return searchIndex(clazz, shouldMatch, searchQuery, pageable);
	}

	@Override
	@Transactional(readOnly = true)
	public <T extends EmptyModel> Map<Class<? extends EmptyModel>, List<? extends EmptyModel>> search(QueryBuilder shouldMatch, String searchQuery, Set<Class<? extends EmptyModel>> clazzes) {

		final Set<String> indexNames = clazzes.stream().filter(clazz -> indexedEntities.contains(clazz)).map(clazz -> toIndexName(clazz) + INDEX_READ).collect(Collectors.toSet());
		if (indexNames.size() == 0) {
			LOG.warn("No clazzes to search");
			return null;
		}
		LOG.debug("Searching {} in indices {}", clazzes, indexNames);

		Map<Class<? extends EmptyModel>, List<? extends EmptyModel>> hits = new HashMap<>();

		Pageable pageable = PageRequest.of(0, 20);

		for (Class<? extends EmptyModel> clazz : clazzes) {
			hits.put(clazz, searchIndex(clazz, shouldMatch, searchQuery, pageable));
		}

		return hits;
	}

	private <T extends EmptyModel> List<T> searchIndex(Class<T> clazz, QueryBuilder shouldMatch, String searchQuery, Pageable pageable) {
		String indexName = toIndexName(clazz) + INDEX_READ;
		LOG.debug("Searching {} in index {}", clazz, indexName);

		BoolQueryBuilder theQuery = boolQuery()
		/*@formatter:off*/
				.should(multiMatchQuery(searchQuery, "_all")
					.field("crop", 0.2f)
					.field("code", 1.5f)
					.field("name", 1.5f)
					.field("title", 1.4f)
					.fuzziness(Fuzziness.ONE)
					.boost(1.2f)
				)
				.should(multiMatchQuery(searchQuery, "code", "name", "title", "description")
					.field("crop", 0.2f)
					.field("code", 1.5f)
					.field("name", 1.5f)
					.field("title", 1.4f)
					.field("description", 0.8f)
					.fuzziness(Fuzziness.AUTO)
					.boost(2.0f)
				)
				.should(queryStringQuery(searchQuery)
					.lenient(true)
					.tieBreaker(1.0f) // was .useDisMaX()
					.fuzziness(Fuzziness.TWO)
					.boost(0.9f)
				);
		/*@formatter:on*/

		if (shouldMatch != null) {
			theQuery.should(shouldMatch);
		}

		SearchQuery query = new NativeSearchQueryBuilder()
		/*@formatter:off*/
				.withIndices(indexName)
				.withTypes(COMMON_TYPE_NAME)
				.withFields("id", "_class")
				.withQuery(theQuery)
				.withPageable(pageable)
				.build();
		/*@formatter:on*/

		if (!indexedEntities.contains(clazz)) {
			throw new RuntimeException("Class is not indexed: " + clazz);
		}

		return elasticsearchRestTemplate.query(query, (response) -> {

			LOG.debug("Search response: {}", response.status());

			List<T> entities = new LinkedList<>();

			response.getHits().forEach(searchHit -> {

				LOG.debug("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());

				T entity = loadEntity(clazz, Long.parseLong(searchHit.getId()));

				if (entity != null) {
					// Filter things
					if (entity instanceof Publishable) {
						if (((Publishable) entity).isPublished()) {
							entities.add(entity);
						}
					} else if (entity instanceof VersionedModel) {
						if (((VersionedModel) entity).isActive()) {
							entities.add(entity);
						}
					} else {
						entities.add(entity);
					}
				}
			});

			return entities;
		});
	}

	private <T extends EmptyModel> T loadEntity(Class<T> clazz, Long id) {
		return (T) em.find(clazz, id);
	}

	@Override
	public TermResult termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
		return termStatistics(clazz, filters, size, new String[] { term }).get(term);
	}

	@Override
	public Map<String, TermResult> termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
		if (!indexedEntities.contains(clazz)) {
			throw new RuntimeException("Class is not indexed " + clazz);
		}
		Long total = count(clazz, filters);

		String indexName = toIndexName(clazz) + INDEX_READ;
		SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(clazz, toEsQuery(clazz, filters));

		for (String term : terms) {
			TermsAggregationBuilder aggregation = AggregationBuilders.terms(term).field(term).size(size).order(BucketOrder.count(false));
			searchSourceBuilder.aggregation(aggregation);
		}

		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder);
		LOG.debug("termStatistics ES query {}", searchRequest);
		SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);

		Map<String, Aggregation> results = response.getAggregations().asMap();
		for (var entry : results.entrySet()) {
			LOG.debug("FYI, ES results {} for {} of {}", entry.getKey(), terms, entry.getValue().getClass());
		}

		Map<String, TermResult> termResults = new HashMap<>();

		for (String term : terms) {
			Aggregation agg = results.get(term);

			if (agg instanceof ParsedTerms) {
				ParsedTerms topCounts = (ParsedTerms) agg;

				List<? extends Terms.Bucket> buckets = topCounts.getBuckets();

				TermResult tr = new TermResult(term, total, buckets.stream().map(bucket -> new Term(bucket.getKeyAsString(), bucket.getDocCount())).collect(Collectors.toList()),
						topCounts.getSumOfOtherDocCounts());

				termResults.put(term, tr);
			} else if (agg instanceof UnmappedTerms) {
				UnmappedTerms unmapped = (UnmappedTerms) agg;
				throw new RuntimeException("Unmapped term " + term + ": " + unmapped.getBuckets());
			} else {
				// Doesn't happen
				throw new RuntimeException("IDK for term " + term + ": " + agg.getClass());
			}
		}

		return termResults;
	}

	/**
	 * Runs TermFacet, but will automatically increase size if #otherCount is more
	 * than 10%
	 */
	@Override
	public TermResult termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
		return termStatisticsAuto(clazz, filters, size, new String[] { term }).get(term);
	}

	@Override
	public Map<String, TermResult> termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {

		Map<String, TermResult> termResult = termStatistics(clazz, filters, size, terms);

		// TBD

		return termResult;
	}

	@Override
	public long count(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ;
		SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(clazz, toEsQuery(clazz, filter, null));
		searchSourceBuilder.size(0);

		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder);
		SearchResponse hits = search(searchRequest, RequestOptions.DEFAULT);
		return hits.getHits().getTotalHits();
	}

	@Override
	public Map<String, Long> countMissingValues(final Class<? extends EmptyModel> indexClass, final EmptyModelFilter<?, ?> filter) throws SearchException {
		final Map<String, Long> results = new HashMap<>();
		final Set<String> fields = jsonSchemas.get(indexClass);
		if (CollectionUtils.isEmpty(fields)) {
			return results;
		}

		final String indexName = toIndexName(indexClass) + INDEX_READ;
		SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(indexClass, toEsQuery(indexClass, filter)); 
		sourceBuilder.size(0);

		for (String fieldName: fields) {
			sourceBuilder.aggregation(AggregationBuilders.missing(fieldName).field(fieldName));
		}
		SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);

		final SearchResponse response = search(esQuery, RequestOptions.DEFAULT);
		LOG.debug("Counting missing values took {}s", response.getTook().getSeconds());
		for (Aggregation agg: response.getAggregations()) {
			long missingCount = ((InternalMissing) agg).getDocCount();
			if (missingCount > 0) {
				results.put(agg.getName(), missingCount);
			}
		}
		results.put("_totalCount", response.getHits().getTotalHits());
		return results;
	}

	@Override
	public TreeNode treeNodeStatistics(final Class<? extends EmptyModel> clazz, final EmptyModelFilter<?, ?> filters, final String[] terms) throws SearchException {
		var indexName = toIndexName(clazz) + INDEX_READ;

		var iterator = new ReverseListIterator<>(List.of(terms));
		TermsAggregationBuilder aggregation = null;
		while (iterator.hasNext()) {
			String groupBy = iterator.next();
			TermsAggregationBuilder agg = AggregationBuilders.terms(groupBy).field(groupBy).size(10);

			if (aggregation != null) {
				agg.subAggregation(aggregation);
			}
			aggregation = agg;
		}

		SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(clazz, toEsQuery(clazz, filters));
		searchSourceBuilder.size(0).aggregation(aggregation);

		SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder);
		SearchResponse response = search(esQuery, RequestOptions.DEFAULT);

		ParsedTerms rootAggregation = response.getAggregations().get(terms[0]);
		TreeNode res = new TreeNode(rootAggregation.getName(), terms[0], response.getHits().getTotalHits(), null, filters);
		res.children = findChildren( rootAggregation, response.getHits().getTotalHits());
		return res;
	}

	private List<TreeNode> findChildren(final ParsedTerms aggregation, final long totalDocCount) {
		List<? extends Bucket> buckets = aggregation.getBuckets();
		List<TreeNode> children = new ArrayList<>(buckets.size());

		if (! buckets.isEmpty()) {
			long otherDocCounts = aggregation.getSumOfOtherDocCounts();
			long childrenDocCount = 0;

			for (Bucket bucket : buckets) {
				if (children.size() < 6 && ((float) bucket.getDocCount() / (totalDocCount - otherDocCounts) * 100) < 10) {
					otherDocCounts += bucket.getDocCount();
					// ignore results less than 10% of the total count
					continue;
				}

				// build a filter
				ObjectNode filter = objectMapper.createObjectNode();
				filter.set(aggregation.getName(), objectMapper.createArrayNode().add(bucket.getKey().toString()));

				TreeNode child = new TreeNode(aggregation.getName(), bucket.getKey().toString(), bucket.getDocCount(), null, filter);
				if (bucket.getAggregations().iterator().hasNext()) {
					child.children = findChildren((ParsedTerms) bucket.getAggregations().asList().get(0), bucket.getDocCount());
				}
				children.add(child);
				childrenDocCount += bucket.getDocCount();
			}

			if (otherDocCounts > 0) {
				// build a filter
				ArrayNode notValues = objectMapper.createArrayNode();
				children.forEach(ch -> notValues.add(ch.getName()));
				ObjectNode not = objectMapper.createObjectNode().set(aggregation.getName(), notValues);
				ObjectNode filter = objectMapper.createObjectNode().set("NOT", not);

				TreeNode other = new TreeNode(aggregation.getName(), "_OTHER", otherDocCounts, null, filter);
				children.add(other);
			}

			// Handle missing
			childrenDocCount += otherDocCounts;
			if (childrenDocCount < totalDocCount) {
				// build a filter
				ObjectNode filter = objectMapper.createObjectNode().set("NULL", objectMapper.createArrayNode().add(aggregation.getName()));

				TreeNode other = new TreeNode(aggregation.getName(), "_MISSING", totalDocCount - childrenDocCount, null, filter);
				children.add(other);
			}
		}
		return children;
	}

	@Override
	@Transactional(readOnly = true)
	public <T extends EmptyModel> List<T> find(Class<T> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ;
		List<T> results = new LinkedList<>();
		SearchRequest esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(new SearchSourceBuilder().query(toEsQuery(clazz, filter)));
		search(esRequest, RequestOptions.DEFAULT)
			// map
			.getHits().forEach(hit -> {
				LOG.trace("Mapping {} id={}", clazz, hit.getId());
				T x = loadEntity(clazz, Long.parseLong(hit.getId()));
				if (x != null) {
					LOG.trace("Adding to results: {}", x);
					results.add(x);
				} else {
					LOG.trace("Got null");
				}
			});

		return results;
	}

	@Override
	@Transactional(readOnly = true)
	public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Pageable page) throws SearchException {
		return findAll(clazz, toEsQuery(clazz, filter, null), page, null);
	}

	@Override
	@Transactional(readOnly = true)
	public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page) throws SearchException {
		return findAll(clazz, toEsQuery(clazz, filter, predicate), page, null);
	}

	@Override
	@Transactional(readOnly = true)
	public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page,
			IEntityLoader<T> entityLoader, String... boostFields) throws SearchException {
		return findAll(clazz, toEsQuery(clazz, filter, predicate), page, entityLoader);
	}

	@Override
	public Number[][] getAccessionGeoBounds(EmptyModelFilter<?, ?> filter) throws SearchException {

		String indexName = toIndexName(Accession.class) + INDEX_READ;
		QueryBuilder queryBuilder = toEsQuery(Accession.class, filter);

		SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(Accession.class, queryBuilder);

		var maxLatitude = AggregationBuilders.max("max_latitude").field("latitude");
		var minLatitude = AggregationBuilders.min("min_latitude").field("latitude");
		var maxLongitude = AggregationBuilders.max("max_longitude").field("longitude");
		var minLongitude = AggregationBuilders.min("min_longitude").field("longitude");

		sourceBuilder.aggregation(maxLatitude);
		sourceBuilder.aggregation(minLatitude);
		sourceBuilder.aggregation(maxLongitude);
		sourceBuilder.aggregation(minLongitude);

		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);

		SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);
		Max maxLatitudeResult = response.getAggregations().get("max_latitude");
		Min minLatitudeResult = response.getAggregations().get("min_latitude");
		Max maxLongitudeResult = response.getAggregations().get("max_longitude");
		Min minLongitudeResult = response.getAggregations().get("min_longitude");

		return new Number[][] { new Number[] { minLatitudeResult.getValue(), maxLongitudeResult.getValue() }, new Number[] { maxLatitudeResult.getValue(), minLongitudeResult.getValue() } };
	}

	@Override
	@Transactional(readOnly = true)
	public Map<String, List<? extends EmptyModel>> fullTextSearch(String text) throws SearchException {
		final String TOP_HITS_AGG = "top_five_hits";
		final String TERMS_AGG = "index_names";
		final String allIndexNames = indexedEntities.stream().map(c -> toIndexName(c) + INDEX_READ).collect(Collectors.joining(","));

		TermsAggregationBuilder agg = AggregationBuilders.terms(TERMS_AGG).field("_index").size(indexedEntities.size())
				.subAggregation(AggregationBuilders.topHits(TOP_HITS_AGG).sort("_score", SortOrder.DESC).size(10));

		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(getFullText(text));
		sourceBuilder.sort("_score", SortOrder.DESC).aggregation(agg);

		SearchRequest searchRequest = new SearchRequest(allIndexNames).types(COMMON_TYPE_NAME).source(sourceBuilder);
		SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);

		ParsedStringTerms aggregation = response.getAggregations().get(TERMS_AGG);
		List<? extends Bucket> buckets = aggregation.getBuckets();

		CaseInsensitiveMap<String, Class<EmptyModel>> caseInsensitiveMap = new CaseInsensitiveMap<>(namesToClasses);

		Map<String, List<? extends EmptyModel>> result = new LinkedHashMap<>(); // save order
		for (Bucket bucket : buckets) {
			TopHits topHitsAgg = bucket.getAggregations().get(TOP_HITS_AGG);
			SearchHits hits = topHitsAgg.getHits();

			String className = bucket.getKey().toString().split("[0-9]")[0];
			Class<EmptyModel> clazz = caseInsensitiveMap.get(className);

			List<EmptyModel> content = new LinkedList<>();
			for (SearchHit hit : hits.getHits()) {
				LOG.debug("Mapping {} id={} score={}", clazz.getSimpleName(), hit.getId(), hit.getScore());
				EmptyModel x = loadEntity(clazz, Long.parseLong(hit.getId()));
				if (x != null) {
					LOG.trace("Adding to results: {}", x);
					content.add(x);
				} else {
					LOG.trace("Got null");
				}
			}

			List<EmptyModel> filteredContent = content.stream().filter(record -> {
				if (record instanceof Publishable) {
					return ((Publishable) record).isPublished();
				}
				if (record instanceof AclAwareModel) {
					return SecurityContextUtil.hasPermission(record, BasePermission.READ);
				}
				return true;
			}).limit(5).collect(Collectors.toList());

			if (!filteredContent.isEmpty()) {
				result.put(clazz.getSimpleName(), filteredContent);
			}
		}

		return result;
	}

	private <T extends EmptyModel> Page<T> findAll(Class<T> clazz, QueryBuilder query, Pageable page, ElasticsearchService.IEntityLoader<T> entityLoader) throws SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ;

		if (page.getOffset() > 100000) {
			LOG.warn("Large offset={} for ES query", page.getOffset());
		}

		SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(clazz, query);
		sourceBuilder.from((int) page.getOffset()).size(page.getPageSize());

		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
		LOG.debug("findAll ES query {}", searchRequest);
		SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);

		List<T> content = new LinkedList<>();

		if (entityLoader != null) {
			// Use custom entity loader
			content.addAll(entityLoader.loadEntities(StreamSupport.stream(response.getHits().spliterator(), false).map(searchHit -> {
				if (LOG.isTraceEnabled()) {
					LOG.trace("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());
				}
				return Long.parseLong(searchHit.getId());
			}).collect(Collectors.toList())));
		} else {
			// Use simple entityManager loader
			response.getHits().forEach(hit -> {
				LOG.debug("Mapping {} id={} score={}", clazz, hit.getId(), hit.getScore());
				T x = loadEntity(clazz, Long.parseLong(hit.getId()));
				if (x != null) {
					LOG.trace("Adding to results: {}", x);
					content.add(x);
				} else {
					LOG.trace("Got null");
				}
			});
		}

		return new PageImpl<>(content, page, response.getHits().getTotalHits());
	}

	private SearchSourceBuilder makeSearchSourceBuilder(Class<? extends EmptyModel> clazz, QueryBuilder query) throws SearchException {

		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(query);
		sourceBuilder.sort("_score", SortOrder.DESC);

//		String indexName = toIndexName(clazz) + INDEX_READ;
//		SearchRequest esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
//		SearchResponse response = search(esRequest, RequestOptions.DEFAULT);

//		float maxScore = response.getHits().getMaxScore();
//		float minScore = maxScore * 0.5f;

//		sourceBuilder = new SearchSourceBuilder().query(query);
//		sourceBuilder.sort("_score", SortOrder.DESC);
//		sourceBuilder.minScore(minScore);
		return sourceBuilder;
	}

	@Override
	@Transactional(readOnly = true)
	public <T extends EmptyModel> void process(Class<T> clazz, EmptyModelFilter<?, ?> filter, IBatchAction<T> action, Long maxSize) throws Exception, SearchException {
		processById(clazz, filter, (ids) -> {
			List<T> loadedEntities = ids.stream().map(id -> loadEntity(clazz, id)).filter(entity -> entity != null).collect(Collectors.toList());
			if (CollectionUtils.isNotEmpty(loadedEntities)) {
				action.apply(loadedEntities);
			}
		}, maxSize == null ? Pageable.unpaged() : Pageable.ofSize(maxSize.intValue()));
	}

	@Override
	public <T extends EmptyModel> void processById(Class<T> clazz, EmptyModelFilter<?, ?> filter, IBatchAction<Long> action, @NonNull Pageable page) throws Exception, SearchException {

		String indexName = toIndexName(clazz) + INDEX_READ;

		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(toEsQuery(clazz, filter));
		sourceBuilder.sort(SortBuilders.fieldSort("id").order(SortOrder.ASC));

		// This doesn't work: sourceBuilder.from((int) page.getOffset()).size(page.getPageSize());
		// Result window is too large, from + size must be less than or equal to: [10000]
		// Need to use scrolling.
		sourceBuilder.size(10000);

		TimeValue scrollTimeout = TimeValue.timeValueSeconds(30);
		SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder).scroll(scrollTimeout);

		SearchResponse searchResponse = search(esQuery, RequestOptions.DEFAULT);
		SearchHits hits = searchResponse.getHits();
		LOG.info("Got total {} hits from ES", hits.getTotalHits());
		if (page.isPaged() && hits.getTotalHits() < page.getOffset()) {
			LOG.warn("ES total hits {} are less than the requested offset {}", hits.getTotalHits(), page.getOffset());
			return;
		}
		AtomicLong counter = new AtomicLong();

		List<Long> content = new LinkedList<>(); // Contains requested page of IDs
		while (hits.getHits().length > 0) {
			LOG.debug("Processing {} hits", hits.getHits().length);

			if (page.isPaged() && page.getOffset() > counter.get() + hits.getHits().length) { // Offset is bigger than entire hits window
				counter.addAndGet(hits.getHits().length); // Just add the number of hits and don't read them
				LOG.trace("Skipped {} for offset {}", counter.get(), page.getOffset());
			} else {
				// Skip what needs skipping
				for (var hit : searchResponse.getHits()) {
					if (page.isUnpaged() || counter.incrementAndGet() > page.getOffset()) { // We're in the offset window
						if (page.isUnpaged() || counter.get() <= page.getOffset() + page.getPageSize()) { // And we have not yet processed the requested page
							content.add(Long.parseLong(hit.getId())); // Add ID to results
						} else {
							break;
						}
					}
				}
			}

			if (! content.isEmpty()) {
				LOG.debug("Processing {} IDs from ES", content.size());
				action.apply(content);
				content.clear();
			}

			if (page.isPaged() && counter.get() >= page.getOffset() + page.getPageSize()) { // We're done processing the requested page
				return;
			}

			// Scroll next
			String scrollId = searchResponse.getScrollId();
			searchResponse = client.scroll(new SearchScrollRequest(scrollId).scroll(scrollTimeout), RequestOptions.DEFAULT);
			hits = searchResponse.getHits();
			LOG.debug("Got scroll hits {}", hits.getTotalHits());
		}
	}

	@Override
	public List<Double[]> distinctCoordinates(Predicate filt, String _text) throws SearchException {

		ElasticQueryBuilder esQb = new ElasticQueryBuilder();
		if (filt != null) {
			filt.accept(esQb, null);
		}
		QueryBuilder esQuery = esQb.getQuery();

		if (StringUtils.isNotBlank(_text)) {
			BoolQueryBuilder fulltext = getFullText(_text, AccessionFilter.ES_BOOSTED_FIELDS);

			if (esQuery instanceof BoolQueryBuilder) {
				esQuery = ((BoolQueryBuilder) esQuery).must(fulltext);
			} else {
				BoolQueryBuilder builder = new BoolQueryBuilder();
				esQuery = builder.filter(esQuery).must(fulltext);
			}
		}

		TimeValue scrollTimeout = TimeValue.timeValueSeconds(30);

		String indexName = toIndexName(Accession.class) + INDEX_READ;
		SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(Accession.class, esQuery);
		searchSourceBuilder
				.storedField(ACCESSION_GEO_LATITUDE)
				.storedField(ACCESSION_GEO_LONGITUDE) // We only want lat+lon
				.sort(SortBuilders.fieldSort("_doc"))
				.size(1000); // 10000 is the max allowed by ES
		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder).scroll(scrollTimeout);

		SearchResponse searchResponse = search(searchRequest, RequestOptions.DEFAULT);
		SearchHits hits = searchResponse.getHits();
//		System.err.println("Got hits " + hits.getTotalHits());
		
		Set<Point> coordinates = Sets.newHashSet();

		while (hits.getHits().length > 0) {
			LOG.trace("Processing {} hits ", hits.getHits().length);
			hits.forEach(hit -> {
				Map<String, DocumentField> fields = hit.getFields();
	//			fields.values().forEach(hitField -> System.err.print(" " + hitField.getValue()));
	//			System.err.println();
				coordinates.add(new Point(
					((Number) fields.get(ACCESSION_GEO_LONGITUDE).getValue()).doubleValue(),
					((Number) fields.get(ACCESSION_GEO_LATITUDE).getValue()).doubleValue()
				));
			});
			
			String scrollId = searchResponse.getScrollId();
			try {
				searchResponse = client.scroll(new SearchScrollRequest(scrollId).scroll(scrollTimeout), RequestOptions.DEFAULT);
			} catch (IOException e) {
				LOG.warn("Error occurred during scroll search", e);
				throw new SearchException(e.getMessage(), e.getCause());
			}

			hits = searchResponse.getHits();
			LOG.trace("Got {} scroll hits with scrollId={}", hits.getTotalHits(), scrollId);
		}

//		LOG.debug("Done scrolling");
		return coordinates.stream().map(point -> new Double[] { point.getX(), point.getY() }).collect(Collectors.toList());
	}

	private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters) {
		return toEsQuery(clazz, filters, null);
	}

	private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters, Predicate predicate) {

		ElasticQueryBuilder esQb = new ElasticQueryBuilder();
		if (filters != null) {
			BooleanBuilder builder = new BooleanBuilder();
			if (predicate != null) {
				builder.and(predicate);
			}
			builder.and(filters.buildPredicate()).accept(esQb, clazz);
		}
		QueryBuilder esQuery = esQb.getQuery();

		if (filters instanceof IFullTextFilter) {
			String text = ((IFullTextFilter) filters).get_text();
			if (StringUtils.isNotBlank(text)) {
				BoolQueryBuilder fulltext = getFullText(text, filters.boostedFields());

				if (esQuery instanceof BoolQueryBuilder) {
					esQuery = ((BoolQueryBuilder) esQuery).must(fulltext);
				} else {
					BoolQueryBuilder builder = new BoolQueryBuilder();
					esQuery = builder.filter(esQuery).must(fulltext);
				}
			}
		}

		try {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Converted {} to\ncurl -XGET 'localhost:???/accession_read/_search?pretty' -H 'Content-type: application/json' -d '\n{ \"query\": \n{} }\n'", new ObjectMapper().writeValueAsString(filters), esQuery);
			}
		} catch (JsonProcessingException e) {
		}
		return esQuery;
	}

	private BoolQueryBuilder getFullText(String text, String... boostFields) {
		
		var charsToEscape = RESERVED_CHARACTERS.stream().filter(text::contains).collect(Collectors.toSet());
		for (String charToEscape : charsToEscape) {
			text = text.replace(charToEscape, "\\" + charToEscape);
		}
		
		BoolQueryBuilder fulltext = boolQuery()
		/*@formatter:off*/
			.should(multiMatchQuery(text, "_texts")
				.fuzziness(Fuzziness.AUTO)
				// .minimumShouldMatch("75%")
				.boost(2.0f)
			)
			.should(queryStringQuery(text)
				.lenient(true)
				.tieBreaker(1.0f) // was .useDisMaX()
				.fuzziness(Fuzziness.AUTO)
				.boost(0.9f)
			);
		/*@formatter:on*/

		{
			SimpleQueryStringBuilder sqsQuery = simpleQueryStringQuery(text)
			/*@formatter:off*/
				.lenient(true)
				.defaultOperator(Operator.AND)
				.field("*")
				.boost(1.0f)
				// .minimumShouldMatch("75%")
			/*@formatter:on*/
			;

			sqsQuery.field("_texts", 2);
			if (boostFields != null && boostFields.length > 0) {
				for (String field : boostFields) {
					sqsQuery.field(field, 3);
				}
			}

			fulltext.should(sqsQuery);
		}

		fulltext.minimumShouldMatch("70%");
		return fulltext;
	}

	private SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws SearchException {
		try {
			return client.search(searchRequest, options);
		} catch (Throwable e) {
			LOG.error("Error occurred during search", e);
			throw new SearchException(e.getMessage(), e.getCause());
		}
	}
}