ElasticsearchServiceImpl.java
/*
* Copyright 2022 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.service.impl;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.multiMatchQuery;
import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.iterators.ReverseListIterator;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SimpleQueryStringBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.model.Publishable;
import org.genesys.blocks.model.VersionedModel;
import org.genesys.blocks.model.filters.EmptyModelFilter;
import org.genesys.blocks.security.SecurityContextUtil;
import org.genesys.blocks.security.model.AclAwareModel;
import org.genesys.custom.elasticsearch.CustomMapping;
import org.genesys.server.api.model.Api1Constants;
import org.genesys.server.component.elastic.ElasticLoader;
import org.genesys.server.component.elastic.ElasticQueryBuilder;
import org.genesys.server.component.elastic.ElasticReindex;
import org.genesys.server.exception.SearchException;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.service.AccessionService.IBatchAction;
import org.genesys.server.service.ElasticsearchService;
import org.genesys.server.service.filter.AccessionFilter;
import org.genesys.server.service.filter.IFullTextFilter;
import org.genesys.spring.config.ElasticsearchConfig.GenesysEntityMapper;
import org.genesys.spring.config.HazelcastConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.data.elasticsearch.core.query.AliasQuery;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilter;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.elasticsearch.core.query.SourceFilter;
import org.springframework.data.geo.Point;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.data.querydsl.SimpleEntityPathResolver;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.acls.domain.BasePermission;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import com.fasterxml.jackson.module.jsonSchema.types.ObjectSchema;
import com.google.common.collect.Sets;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.EntityPath;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.core.types.dsl.SetPath;
import com.querydsl.jpa.JPQLQuery;
import lombok.NonNull;
/**
* Manage Elasticsearch indexing, indices.
*
* @author Matija Obreza
*/
// Not @Service
@Transactional(readOnly = true)
public class ElasticsearchServiceImpl implements ElasticsearchService, InitializingBean {
private static final String ACCESSION_GEO_LONGITUDE = Api1Constants.Accession.LONGITUDE;
private static final String ACCESSION_GEO_LATITUDE = Api1Constants.Accession.LATITUDE;
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchServiceImpl.class);
private static final String INDEX_READ = "_read";
private static final String INDEX_WRITE = "_write";
private static final String ALIAS_EVERYTHING = "Everything";
private static final String COMMON_TYPE_NAME = "data";
@Value("${elasticsearch.cluster.nodes}")
private String esClusterNodes;
private static final ObjectMapper objectMapper = new ObjectMapper();
/** The Constant DEFAULT_SOURCE_FILTER. */
protected static final SourceFilter DEFAULT_SOURCE_FILTER = new FetchSourceFilter(new String[] { "id", "_class", "title", "code", "description" }, new String[] {});
protected static final Set<String> RESERVED_CHARACTERS = Set.of("/", "+", "-", "=", "&&", "||", "!", "(", ")", "{", "}", "[", "]", "^", "~", "*", "?", ":");
@Resource
private BlockingQueue<ElasticReindex> elasticReindexQueue;
@Resource
@Qualifier("elasticsearchReindexLock")
private Lock elasticsearchReindexLock;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private EntityManager em;
@Autowired
private ElasticsearchOperations elasticsearchRestTemplate;
@Autowired
private RestHighLevelClient client;
@Autowired
private EntityMapper mapper;
@Autowired
private ElasticsearchService _self;
private final Set<Class<? extends EmptyModel>> indexedEntities = Collections.synchronizedSet(new HashSet<>());
private final Map<String, Class<EmptyModel>> namesToClasses = Collections.synchronizedMap(new HashMap<>());
private final Map<Class<? extends EmptyModel>, Set<String>> jsonSchemas = new HashMap<>();
private final Map<Class<? extends EmptyModel>, Integer> reindexBatchSize = new HashMap<>();
/// Size of database batch scan for IDs
private int batchSize = 1000;
@Resource
@Qualifier("clusterFlags")
private Map<String, Boolean> clusterFlags;
@Override
public void afterPropertiesSet() throws Exception {
if (withElasticsearch()) {
for (Class<? extends EmptyModel> clazz : indexedEntities) {
final String writeIndex = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);
if (writeIndex == null) {
ensureWriteAlias(clazz);
} else {
LOG.info("Updating write index {} for {}", writeIndex, clazz.getName());
Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
try {
elasticsearchRestTemplate.putMapping(writeIndex, COMMON_TYPE_NAME, mapping);
} catch (Throwable e) {
LOG.error("The ES mapping is no longer compatible for index={} of {}. Please regenerate.", writeIndex, clazz);
}
}
}
} else {
LOG.warn("Elasticsearch not accessible, not updating mappings");
}
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(((GenesysEntityMapper) mapper).getObjectMapper());
for (Class<? extends EmptyModel> clazz: indexedEntities) {
try {
JsonSchema schema = schemaGen.generateSchema(clazz);
jsonSchemas.put(clazz, buildJsonPaths(((ObjectSchema) schema).getProperties(), null));
} catch (Throwable e) {
LOG.error("The list of all {} fields is not created.", clazz.getSimpleName(), e);
}
}
}
@Override
public long waitForCount(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filter, int mustHaveCount) throws SearchException {
long count = 0;
int repeats = 0;
do {
try {
var readIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_READ);
var writeIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);
if (! Objects.equals(readIndexName, writeIndexName)) {
LOG.warn("Read/write index mismatch {}!={}.", readIndexName, writeIndexName);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
throw new ElasticsearchStatusException("Read/write index mismatch,", null); // This is caught later and will reset counts.
}
count = count(clazz, filter);
if (count != mustHaveCount) {
LOG.debug("ES count of {} is {}!={}", clazz.getName(), count, mustHaveCount);
if (repeats++ > 10) {
throw new RuntimeException("ES count did not settle in 10 retries.");
}
try {
var sleepTime = (repeats * 200L) + RandomUtils.nextLong((repeats) * 50L, (repeats + 1) * 100L);
LOG.warn("ES count #{} for {} is {}!={}, sleeping for {}ms...", repeats, clazz.getName(), count, mustHaveCount, sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
return count;
}
}
} catch (ElasticsearchStatusException e) {
count = -1;
repeats = 0;
LOG.warn("Error while waiting for count: {}. Retrying...", e.getMessage());
}
} while (count != mustHaveCount);
return count;
}
/**
* Makes a list of all JSON paths for indexed entity and all related types.
*
* @param properties properties
* @param parentPath parentPath
*/
private Set<String> buildJsonPaths(Map<String, JsonSchema> properties, String parentPath) {
if (MapUtils.isEmpty(properties)) {
return Collections.emptySet();
}
Set<String> fieldList = new HashSet<>();
Set<Map.Entry<String, JsonSchema>> entries = properties.entrySet();
entries.removeIf(e -> e.getKey().equals("_class") || e.getKey().equals("_permissions"));
for (var entry : entries) {
JsonSchema schema = entry.getValue();
String fullPath = StringUtils.isBlank(parentPath) ? entry.getKey() : parentPath + "." + entry.getKey();
if (schema instanceof ObjectSchema) {
fieldList.addAll(buildJsonPaths(((ObjectSchema)schema).getProperties(), fullPath));
} else {
fieldList.add(fullPath);
}
}
return fieldList;
}
/**
* Set the size of IDs loaded in a single batch when reindexing an entity.
* {@link #batchSize}.
*
* @param batchSize
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
private boolean withElasticsearch() {
try {
return RestStatus.OK == client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT).status();
} catch (Throwable e) {
LOG.warn(e.getMessage());
return false;
}
}
@Override
public void reindexAll() {
for (Class<? extends EmptyModel> clazz : indexedEntities) {
if (Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value)) || Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX.value))) {
break;
}
reindex(clazz);
}
LOG.warn("Clearing STOP_REINDEX flags");
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, false);
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value, false);
}
@Override
public void stopReindex() {
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, true);
}
@Override
public void stopReindexAll() {
LOG.warn("Setting STOP_REINDEX flags");
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, true);
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value, true);
}
@Override
public void allowReindexAll() {
LOG.warn("Clearing STOP_REINDEX flags");
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, false);
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX_ALL.value, false);
}
@Override
public TermResult recountResult(Class<? extends EmptyModel> clazz, SetPath<?, ?> setPath, EmptyModelFilter<?, ?> filter, TermResult toRecount, String termName) throws ExecutionException, InterruptedException, SearchException {
String indexName = toIndexName(clazz) + INDEX_READ;
QueryBuilder q1 = toEsQuery(clazz, filter, setPath.size().gt(0)); // non-empty []
QueryBuilder q2 = toEsQuery(clazz, filter, setPath.size().eq(0)); // empty []
ValueCountAggregationBuilder aggregation = AggregationBuilders.count(termName).field(termName);
SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(clazz, q1);
sourceBuilder.size(0).aggregation(aggregation);
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
ValueCount termAgg = search(searchRequest, RequestOptions.DEFAULT).getAggregations().get(termName);
long setCount = termAgg.getValue();
sourceBuilder = makeSearchSourceBuilder(clazz, q2);
sourceBuilder.size(0).aggregation(aggregation);
SearchRequest entityEsQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
long entityCount = search(entityEsQuery, RequestOptions.DEFAULT).getHits().getTotalHits();
return new TermResult(termName, setCount + entityCount, toRecount.getTerms(), toRecount.getOther());
}
@SuppressWarnings("unchecked")
@Override
public <R extends EmptyModel> void indexEntity(Class<R> clazz) {
synchronized (this) {
if (indexedEntities.contains(clazz)) {
LOG.warn("Entity {} already set for indexing.", clazz.getName());
return;
}
}
indexedEntities.add(clazz);
namesToClasses.put(clazz.getSimpleName(), (Class<EmptyModel>) clazz);
}
@Override
public <R extends EmptyModel> void indexEntity(Class<R> clazz, int reindexBatchSize) {
indexEntity(clazz);
this.reindexBatchSize.put(clazz, reindexBatchSize);
}
/**
* Sets the reindex batch size.
*
* @param <R> the generic type
* @param model the model
* @param batchSize the batch size
* @return the integer
*/
@Override
public <R extends EmptyModel> Integer setReindexBatchSize(Class<R> model, Integer batchSize) {
return this.reindexBatchSize.put(model, batchSize);
}
@Override
public List<Class<?>> getIndexedEntities() {
ArrayList<Class<?>> entities = new ArrayList<>(this.indexedEntities);
entities.sort(Comparator.comparing(Class::getName));
return ListUtils.unmodifiableList(entities);
}
@Override
public <R> void removeAll(Class<R> clazz) throws SearchException {
if (!indexedEntities.contains(clazz)) {
LOG.debug("Class {} is not indexed.", clazz.getName());
return;
}
if (!withElasticsearch()) {
LOG.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
return;
}
LOG.info("Deleting all docs for {}", clazz.getName());
DeleteQuery dq = new DeleteQuery();
dq.setIndex(toIndexName(clazz) + INDEX_WRITE);
dq.setType(COMMON_TYPE_NAME);
dq.setQuery(new MatchAllQueryBuilder());
elasticsearchRestTemplate.delete(dq);
}
@Override
public <R> void reindex(Class<R> clazz) {
boolean isLockAcquired = elasticsearchReindexLock.tryLock();
if (isLockAcquired) {
try {
internalReindex(clazz);
} finally {
elasticsearchReindexLock.unlock();
}
} else {
throw new RuntimeException("Could not run reindex for " + clazz.getSimpleName() + ". Operation locked.");
}
}
private <R> void internalReindex(Class<R> clazz) {
if (!indexedEntities.contains(clazz)) {
LOG.warn("Class {} is not indexed.", clazz.getName());
return;
}
if (!withElasticsearch()) {
LOG.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
return;
}
LOG.info("Reindexing {}", clazz.getName());
final String indexRoot = toIndexName(clazz);
// Figure out existing index name
String currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_READ);
if (currentIndexName == null) {
currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_WRITE);
}
// Make new index with timestamp
final String indexName = createIndexFor(clazz);
// The old index name
final String oldIndexName = currentIndexName;
// Scan
scanDatabase(clazz, null);
// Schedule rename
taskExecutor.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Fine...
}
try {
do {
var queueSize = elasticReindexQueue.size();
if (queueSize == 0) {
break;
}
LOG.trace("ES Reindex queue has {} elements, waiting to realias {}...", queueSize, indexName);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// Fine...
}
} while (true);
LOG.info("ES Reindex queue is empty, realiasing {}", indexName);
// Move _READ alias
realias(indexRoot + INDEX_READ, indexName);
// Add to EVERYTHING
deleteAlias(ALIAS_EVERYTHING, indexRoot + "*");
addAlias(ALIAS_EVERYTHING, indexName);
// delete old index
if (oldIndexName != null) {
LOG.info("Deleting old index {}", oldIndexName);
elasticsearchRestTemplate.deleteIndex(oldIndexName);
}
} catch (Throwable e) {
LOG.warn("Realiasing after full scan threw an exception: {}", e.getMessage(), e);
}
});
}
@Override
public <T extends EmptyModel> void reindex(Class<T> clazz, EmptyModelFilter<?, ?> filter) {
scanDatabase(clazz, filter);
}
@Override
public <T extends EmptyModel> void remove(Class<T> clazz, EmptyModelFilter<?, T> filter) throws SearchException {
String indexName = toIndexName(clazz) + INDEX_READ; // Remove from READ index
long esCount = count(clazz, filter);
LOG.warn("ES removing {} documents of {}", esCount, clazz.toString());
DeleteQuery dq = new DeleteQuery();
dq.setIndex(indexName);
dq.setType(COMMON_TYPE_NAME);
dq.setQuery(toEsQuery(clazz, filter));
elasticsearchRestTemplate.delete(dq);
}
/**
* Creates the index for the entity. It assigns it the _WRITE alias
*
* @param clazz the clazz
* @return the new index name
*/
private String createIndexFor(Class<?> clazz) {
final String indexRoot = toIndexName(clazz);
final String indexName = indexRoot + System.currentTimeMillis();
LOG.info("Creating new index {} for {}", indexName, clazz.getName());
/*@formatter:off*/
MapBuilder<String, Object> settingsBuilder = new MapBuilder<String, Object>()
// .put("index.blocks.read_only_allow_delete", null)
// .put("index.mapping.total_fields.limit", 1000)
// .put("index.number_of_shards", 1)
;
/*@formatter:on*/
if (! "embedded".equals(esClusterNodes)) {
// /*@formatter:off*/
// settingsBuilder
// // Analyzer
// .put("index.analysis.analyzer.default.tokenizer", "standard")
// .put("index.analysis.analyzer.default.filter.0", "asciifolding2")
// // Filter
// .put("index.analysis.filter.asciifolding2.type", "asciifolding")
// .put("index.analysis.filter.asciifolding2.preserve_original", true)
// ;
// /*@formatter:on*/
} else {
// Embedded ES
settingsBuilder
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0);
}
elasticsearchRestTemplate.createIndex(indexName, settingsBuilder.map());
Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
elasticsearchRestTemplate.putMapping(indexName, COMMON_TYPE_NAME, mapping);
// Add _WRITE alias
realias(indexRoot + INDEX_WRITE, indexName);
return indexName;
}
private <R> void scanDatabase(Class<R> clazz, EmptyModelFilter<?, ?> filter) {
PathBuilder<R> builder = new PathBuilderFactory().create(clazz);
Querydsl querydsl = new Querydsl(em, builder);
EntityPath<R> entityPath = SimpleEntityPathResolver.INSTANCE.createPath(clazz);
PathBuilder<R> pathBuilder = new PathBuilder<R>(clazz, entityPath.getMetadata().getName());
JPQLQuery<Long> query = querydsl.createQuery(entityPath)
// select id only
.select(pathBuilder.getNumber("id", Long.class))
// and order by id
.orderBy(pathBuilder.getNumber("id", Long.class).asc());
if (filter != null) {
// apply filter
query.where(filter.buildPredicate());
}
Integer scanSize = reindexBatchSize.get(clazz);
final int customBatchSize = scanSize == null ? batchSize : scanSize.intValue();
int startPosition = 0;
query.offset(startPosition);
query.limit(customBatchSize);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
List<Long> results;
do {
stopWatch.split();
LOG.debug("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
if (startPosition > 10 * customBatchSize && startPosition / (10 * customBatchSize) == 0) {
LOG.info("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
}
results = query.fetch();
// Schedule indexing
elasticReindexQueue.addAll(results.stream().map((res) -> new ElasticReindex(clazz, res)).collect(Collectors.toList()));
// Next page
query.offset(startPosition += results.size());
} while (
results.size() > 0 // We have data
&& !Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX.value)) // Not stopped
);
var stopReindex = Boolean.TRUE.equals(clusterFlags.get(HazelcastConfig.ClusterFlags.STOP_REINDEX.value));
LOG.debug("Checking for stop reindex value. StopReindex = {}", stopReindex);
if (stopReindex) {
elasticReindexQueue.clear();
clusterFlags.put(HazelcastConfig.ClusterFlags.STOP_REINDEX.value, false);
}
stopWatch.stop();
LOG.info("Scanning {} for reindex took {}ms", clazz.getName(), stopWatch.getTime());
}
/**
* Schedule a parallel update.
*
* @param clazz Entity to reindex
* @param entityIds Entity identifiers to reindex
*/
@Override
public <R> void asyncUpdate(final Class<R> clazz, final Collection<Long> entityIds) {
if (!indexedEntities.contains(clazz)) {
LOG.warn("Class {} is not indexed.", clazz.getName());
return;
}
final HashSet<Long> copy = new HashSet<>(entityIds);
if (copy.size() == 0) {
return;
}
taskExecutor.execute(() -> {
LOG.debug("Running scheduled reindex of {} size={}", clazz.getName(), copy.size());
try {
_self.update(clazz, copy);
} catch (Throwable e) {
LOG.error("Error updating {}: {}", clazz.getSimpleName(), e.getMessage(), e);
}
});
}
/**
* Will not modify the list of IDs
*/
@Override
public <R> void update(final Class<R> clazz, final Collection<Long> ids) {
if (!indexedEntities.contains(clazz)) {
LOG.warn("Class {} is not indexed.", clazz.getName());
return;
}
HashSet<Long> notFoundIds = new HashSet<>(ids);
final String indexName = toIndexName(clazz) + INDEX_WRITE;
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<R> cq = cb.createQuery(clazz);
Root<R> root = cq.from(clazz);
cq.where(root.get("id").in(ids));
TypedQuery<R> query = em.createQuery(cq);
List<R> results = query.getResultList();
List<IndexQuery> queries = new LinkedList<IndexQuery>();
Map<String, String> jsons = new HashMap<>();
for (R x : results) {
LOG.trace("Indexing {} {}", clazz.getName(), x);
EmptyModel bm = (EmptyModel) x;
if (x instanceof ElasticLoader) {
// Prepare entity for indexing (e.g. lazy-load)
((ElasticLoader) x).prepareForIndexing();
}
try {
jsons.put(bm.getId().toString(), mapper.mapToString(bm));
} catch (IOException e) {
e.printStackTrace();
}
LOG.trace("JSON: {}", jsons.get(bm.getId().toString()));
// is found
notFoundIds.remove(bm.getId());
IndexQuery iq = new IndexQuery();
iq.setIndexName(indexName);
iq.setType(COMMON_TYPE_NAME);
iq.setId(String.valueOf(bm.getId()));
iq.setObject(bm);
queries.add(iq);
}
if (!queries.isEmpty()) {
LOG.debug("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());
try {
elasticsearchRestTemplate.bulkIndex(queries);
} catch (org.springframework.data.elasticsearch.ElasticsearchException e) {
LOG.error(e.getMessage(), e);
Map<String, String> failedDocs = e.getFailedDocuments();
if (failedDocs != null) {
for (var entry : failedDocs.entrySet()) {
LOG.error("{} {}\n{}", entry.getKey(), entry.getValue(), jsons.get(entry.getKey()));
}
}
} catch (ElasticsearchException e) {
LOG.error("Could not index document", e);
}
}
// for (R x : results) {
// // detach from EM
// em.detach(x);
// }
em.clear();
queries.clear();
for (Long id : notFoundIds) {
LOG.trace("Removing {} id={} from index {}/{}", clazz.getName(), id, indexName, COMMON_TYPE_NAME);
String res = elasticsearchRestTemplate.delete(indexName, COMMON_TYPE_NAME, String.valueOf(id));
LOG.trace("Deleted ES document id={}", res);
}
}
/**
* Checks that index "write" alias exists. When alias is not found, a new index
* is created and alias points to it.
*
* @param clazz for type mapping
*/
protected synchronized void ensureWriteAlias(Class<?> clazz) {
String indexRoot = toIndexName(clazz);
String aliasWrite = indexRoot + INDEX_WRITE;
if (!aliasExists(aliasWrite)) {
final String indexName = createIndexFor(clazz);
String aliasRead = indexRoot + INDEX_READ;
if (!aliasExists(aliasRead)) {
deleteAlias(ALIAS_EVERYTHING, indexRoot + "*");
addAlias(ALIAS_EVERYTHING, indexName);
addAlias(aliasRead, indexName);
}
}
}
/**
* Get index name for clazz
*
* @param clazz
* @return
*/
private String toIndexName(Class<?> clazz) {
return clazz.getSimpleName().toLowerCase();
}
private boolean aliasExists(final String aliasName) {
try {
return client.indices().existsAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
} catch (IOException e) {
return false;
}
}
/**
* Make the alias point exclusively to the specified index
*
* @param aliasName The alias name
* @param indexName The index the alias points to
*/
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void realias(final String aliasName, final String indexName) {
deleteAlias(aliasName);
addAlias(aliasName, indexName);
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void addAlias(String aliasName, String indexName) {
final AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(indexName);
LOG.info("Adding alias {} to index {}", aliasName, indexName);
this.elasticsearchRestTemplate.addAlias(query);
}
private String getFirstIndexForAlias(final String aliasName) {
try {
GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
return response != null && !response.getAliases().isEmpty() ? (String) response.getAliases().keySet().toArray()[0] : null;
} catch (Throwable e) {
LOG.warn("Error while getting index for alias={}:", aliasName, e);
return null;
}
}
/**
* Delete alias.
*
* @param aliasName the alias name
*/
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void deleteAlias(final String aliasName) {
try {
GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
if (response != null && !response.getAliases().isEmpty()) {
for (final String indexName: response.getAliases().keySet()) {
deleteAlias(aliasName, indexName);
}
}
} catch (IOException e) {
LOG.warn("Error while deleting alias={}:", aliasName, e);
}
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void deleteIndex(final String indexName) {
elasticsearchRestTemplate.deleteIndex(indexName);
}
private void deleteAlias(String aliasName, String indexName) {
final AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(indexName);
try {
this.elasticsearchRestTemplate.removeAlias(query);
LOG.info("Removed alias {} from index {}", aliasName, indexName);
} catch (ElasticsearchStatusException | AliasesNotFoundException e) {
LOG.warn("Alias {} does not exist on index {}", aliasName, indexName);
}
}
@Override
public <T extends EmptyModel> List<T> search(QueryBuilder shouldMatch, String searchQuery, Class<T> clazz) {
if (!indexedEntities.contains(clazz)) {
throw new RuntimeException("Class is not indexed " + clazz);
}
Pageable pageable = PageRequest.of(0, 20);
return searchIndex(clazz, shouldMatch, searchQuery, pageable);
}
@Override
public <T extends EmptyModel> Map<Class<? extends EmptyModel>, List<? extends EmptyModel>> search(QueryBuilder shouldMatch, String searchQuery, Set<Class<? extends EmptyModel>> clazzes) {
final Set<String> indexNames = clazzes.stream().filter(clazz -> indexedEntities.contains(clazz)).map(clazz -> toIndexName(clazz) + INDEX_READ).collect(Collectors.toSet());
if (indexNames.size() == 0) {
LOG.warn("No clazzes to search");
return null;
}
LOG.debug("Searching {} in indices {}", clazzes, indexNames);
Map<Class<? extends EmptyModel>, List<? extends EmptyModel>> hits = new HashMap<>();
Pageable pageable = PageRequest.of(0, 20);
for (Class<? extends EmptyModel> clazz : clazzes) {
hits.put(clazz, searchIndex(clazz, shouldMatch, searchQuery, pageable));
}
return hits;
}
private <T extends EmptyModel> List<T> searchIndex(Class<T> clazz, QueryBuilder shouldMatch, String searchQuery, Pageable pageable) {
String indexName = toIndexName(clazz) + INDEX_READ;
LOG.debug("Searching {} in index {}", clazz, indexName);
BoolQueryBuilder theQuery = boolQuery()
/*@formatter:off*/
.should(multiMatchQuery(searchQuery, "_all")
.field("crop", 0.2f)
.field("code", 1.5f)
.field("name", 1.5f)
.field("title", 1.4f)
.fuzziness(Fuzziness.ONE)
.boost(1.2f)
)
.should(multiMatchQuery(searchQuery, "code", "name", "title", "description")
.field("crop", 0.2f)
.field("code", 1.5f)
.field("name", 1.5f)
.field("title", 1.4f)
.field("description", 0.8f)
.fuzziness(Fuzziness.AUTO)
.boost(2.0f)
)
.should(queryStringQuery(searchQuery)
.lenient(true)
.tieBreaker(1.0f) // was .useDisMaX()
.fuzziness(Fuzziness.TWO)
.boost(0.9f)
);
/*@formatter:on*/
if (shouldMatch != null) {
theQuery.should(shouldMatch);
}
SearchQuery query = new NativeSearchQueryBuilder()
/*@formatter:off*/
.withIndices(indexName)
.withTypes(COMMON_TYPE_NAME)
.withFields("id", "_class")
.withQuery(theQuery)
.withPageable(pageable)
.build();
/*@formatter:on*/
if (!indexedEntities.contains(clazz)) {
throw new RuntimeException("Class is not indexed: " + clazz);
}
return elasticsearchRestTemplate.query(query, (response) -> {
LOG.debug("Search response: {}", response.status());
List<T> entities = new LinkedList<>();
response.getHits().forEach(searchHit -> {
LOG.debug("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());
T entity = loadEntity(clazz, Long.parseLong(searchHit.getId()));
if (entity != null) {
// Filter things
if (entity instanceof Publishable) {
if (((Publishable) entity).isPublished()) {
entities.add(entity);
}
} else if (entity instanceof VersionedModel) {
if (((VersionedModel) entity).isActive()) {
entities.add(entity);
}
} else {
entities.add(entity);
}
}
});
return entities;
});
}
private <T extends EmptyModel> T loadEntity(Class<T> clazz, Long id) {
return (T) em.find(clazz, id);
}
@Override
public TermResult termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
return termStatistics(clazz, filters, size, new String[] { term }).get(term);
}
@Override
public Map<String, TermResult> termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
if (!indexedEntities.contains(clazz)) {
throw new RuntimeException("Class is not indexed " + clazz);
}
Long total = count(clazz, filters);
String indexName = toIndexName(clazz) + INDEX_READ;
SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(clazz, toEsQuery(clazz, filters));
for (String term : terms) {
TermsAggregationBuilder aggregation = AggregationBuilders.terms(term).field(term).size(size).order(BucketOrder.count(false));
searchSourceBuilder.aggregation(aggregation);
}
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder);
LOG.debug("termStatistics ES query {}", searchRequest);
SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);
Map<String, Aggregation> results = response.getAggregations().asMap();
for (var entry : results.entrySet()) {
LOG.debug("FYI, ES results {} for {} of {}", entry.getKey(), terms, entry.getValue().getClass());
}
Map<String, TermResult> termResults = new HashMap<>();
for (String term : terms) {
Aggregation agg = results.get(term);
if (agg instanceof ParsedTerms) {
ParsedTerms topCounts = (ParsedTerms) agg;
List<? extends Terms.Bucket> buckets = topCounts.getBuckets();
TermResult tr = new TermResult(term, total, buckets.stream().map(bucket -> new Term(bucket.getKeyAsString(), bucket.getDocCount())).collect(Collectors.toList()),
topCounts.getSumOfOtherDocCounts());
termResults.put(term, tr);
} else if (agg instanceof UnmappedTerms) {
UnmappedTerms unmapped = (UnmappedTerms) agg;
throw new RuntimeException("Unmapped term " + term + ": " + unmapped.getBuckets());
} else {
// Doesn't happen
throw new RuntimeException("IDK for term " + term + ": " + agg.getClass());
}
}
return termResults;
}
/**
* Runs TermFacet, but will automatically increase size if #otherCount is more
* than 10%
*/
@Override
public TermResult termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
return termStatisticsAuto(clazz, filters, size, new String[] { term }).get(term);
}
@Override
public Map<String, TermResult> termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
Map<String, TermResult> termResult = termStatistics(clazz, filters, size, terms);
// TBD
return termResult;
}
@Override
public long count(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
String indexName = toIndexName(clazz) + INDEX_READ;
SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(clazz, toEsQuery(clazz, filter, null));
searchSourceBuilder.size(0);
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder);
SearchResponse hits = search(searchRequest, RequestOptions.DEFAULT);
return hits.getHits().getTotalHits();
}
@Override
public Map<String, Long> countMissingValues(final Class<? extends EmptyModel> indexClass, final EmptyModelFilter<?, ?> filter) throws SearchException {
final Map<String, Long> results = new HashMap<>();
final Set<String> fields = jsonSchemas.get(indexClass);
if (CollectionUtils.isEmpty(fields)) {
return results;
}
final String indexName = toIndexName(indexClass) + INDEX_READ;
SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(indexClass, toEsQuery(indexClass, filter));
sourceBuilder.size(0);
for (String fieldName: fields) {
sourceBuilder.aggregation(AggregationBuilders.missing(fieldName).field(fieldName));
}
SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
final SearchResponse response = search(esQuery, RequestOptions.DEFAULT);
LOG.debug("Counting missing values took {}s", response.getTook().getSeconds());
for (Aggregation agg: response.getAggregations()) {
long missingCount = ((InternalMissing) agg).getDocCount();
if (missingCount > 0) {
results.put(agg.getName(), missingCount);
}
}
results.put("_totalCount", response.getHits().getTotalHits());
return results;
}
@Override
public TreeNode treeNodeStatistics(final Class<? extends EmptyModel> clazz, final EmptyModelFilter<?, ?> filters, final String[] terms) throws SearchException {
var indexName = toIndexName(clazz) + INDEX_READ;
var iterator = new ReverseListIterator<>(List.of(terms));
TermsAggregationBuilder aggregation = null;
while (iterator.hasNext()) {
String groupBy = iterator.next();
TermsAggregationBuilder agg = AggregationBuilders.terms(groupBy).field(groupBy).size(10);
if (aggregation != null) {
agg.subAggregation(aggregation);
}
aggregation = agg;
}
SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(clazz, toEsQuery(clazz, filters));
searchSourceBuilder.size(0).aggregation(aggregation);
SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder);
SearchResponse response = search(esQuery, RequestOptions.DEFAULT);
ParsedTerms rootAggregation = response.getAggregations().get(terms[0]);
TreeNode res = new TreeNode(rootAggregation.getName(), terms[0], response.getHits().getTotalHits(), null, filters);
res.children = findChildren( rootAggregation, response.getHits().getTotalHits());
return res;
}
private List<TreeNode> findChildren(final ParsedTerms aggregation, final long totalDocCount) {
List<? extends Bucket> buckets = aggregation.getBuckets();
List<TreeNode> children = new ArrayList<>(buckets.size());
if (! buckets.isEmpty()) {
long otherDocCounts = aggregation.getSumOfOtherDocCounts();
long childrenDocCount = 0;
for (Bucket bucket : buckets) {
if (children.size() < 6 && ((float) bucket.getDocCount() / (totalDocCount - otherDocCounts) * 100) < 10) {
otherDocCounts += bucket.getDocCount();
// ignore results less than 10% of the total count
continue;
}
// build a filter
ObjectNode filter = objectMapper.createObjectNode();
filter.set(aggregation.getName(), objectMapper.createArrayNode().add(bucket.getKey().toString()));
TreeNode child = new TreeNode(aggregation.getName(), bucket.getKey().toString(), bucket.getDocCount(), null, filter);
if (bucket.getAggregations().iterator().hasNext()) {
child.children = findChildren((ParsedTerms) bucket.getAggregations().asList().get(0), bucket.getDocCount());
}
children.add(child);
childrenDocCount += bucket.getDocCount();
}
if (otherDocCounts > 0) {
// build a filter
ArrayNode notValues = objectMapper.createArrayNode();
children.forEach(ch -> notValues.add(ch.getName()));
ObjectNode not = objectMapper.createObjectNode().set(aggregation.getName(), notValues);
ObjectNode filter = objectMapper.createObjectNode().set("NOT", not);
TreeNode other = new TreeNode(aggregation.getName(), "_OTHER", otherDocCounts, null, filter);
children.add(other);
}
// Handle missing
childrenDocCount += otherDocCounts;
if (childrenDocCount < totalDocCount) {
// build a filter
ObjectNode filter = objectMapper.createObjectNode().set("NULL", objectMapper.createArrayNode().add(aggregation.getName()));
TreeNode other = new TreeNode(aggregation.getName(), "_MISSING", totalDocCount - childrenDocCount, null, filter);
children.add(other);
}
}
return children;
}
@Override
@Cacheable(value = "statistics", unless = "#result == null", key = "'stats.' + #root.methodName + '-' + #size + '-' + #targetClass.name + '-' + #indexClass.name")
public List<Object[]> aggregateDate(final int size, final Class<? extends EmptyModel> targetClass, final Class<? extends EmptyModel> indexClass,
final String aggregatedDateField, final String groupingByField, final EmptyModelFilter<?, ?> filter) throws SearchException {
final String DATE_HISTOGRAM_AGG = "updates_over_time";
final String TERMS_AGG = "institute_terms";
final String TOP_HITS_AGG = "top_tag_hits";
final String indexName = toIndexName(indexClass) + INDEX_READ;
AggregationBuilder aggregation =
AggregationBuilders.dateHistogram(DATE_HISTOGRAM_AGG).field(aggregatedDateField).order(BucketOrder.key(false)).minDocCount(1).dateHistogramInterval(DateHistogramInterval.days(1))
.subAggregation(AggregationBuilders.terms(TERMS_AGG).field(groupingByField).size(size)
.subAggregation(AggregationBuilders.topHits(TOP_HITS_AGG).sort(aggregatedDateField, SortOrder.DESC)
.docValueField(groupingByField).docValueField(aggregatedDateField).size(1)));
SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(indexClass, toEsQuery(indexClass, filter));
searchSourceBuilder.size(0).aggregation(aggregation);
SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder);
SearchResponse response = search(esQuery, RequestOptions.DEFAULT);
InternalHistogram internalHistogram = response.getAggregations().get(DATE_HISTOGRAM_AGG);
List<? extends InternalHistogram.Bucket> buckets = internalHistogram.getBuckets();
final Map<Long, Long> entities = new LinkedHashMap<>(size);
mainLoop:
for (InternalHistogram.Bucket bucket : buckets) {
Terms terms = bucket.getAggregations().get(TERMS_AGG);
for (Bucket instBucket: terms.getBuckets()) {
TopHits topHits = instBucket.getAggregations().get(TOP_HITS_AGG);
SearchHit searchHit = topHits.getHits().getHits()[0];
Map<String, DocumentField> result = searchHit.getFields();
List<Object> datesList = result.get(aggregatedDateField).getValues();
Long maxDate = datesList.stream().mapToLong(v -> (Long)v).max().orElse((Long)datesList.get(0));
Long instId = Long.valueOf(result.get(groupingByField).getValues().get(0).toString());
entities.put(instId, maxDate);
if (entities.size() == size) {
break mainLoop;
}
}
}
List<Object[]> resultList = new ArrayList<>(size);
entities.keySet().forEach(key -> {
final Object[] itemInfo = new Object[4];
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(entities.get(key));
itemInfo[0] = cal.get(Calendar.YEAR);
itemInfo[1] = cal.get(Calendar.MONTH) + 1;
itemInfo[2] = cal.get(Calendar.DAY_OF_MONTH);
itemInfo[3] = loadEntity(targetClass, key);
resultList.add(itemInfo);
});
return resultList;
}
@Override
public <T extends EmptyModel> List<T> find(Class<T> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
String indexName = toIndexName(clazz) + INDEX_READ;
List<T> results = new LinkedList<>();
SearchRequest esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(new SearchSourceBuilder().query(toEsQuery(clazz, filter)));
search(esRequest, RequestOptions.DEFAULT)
// map
.getHits().forEach(hit -> {
LOG.trace("Mapping {} id={}", clazz, hit.getId());
T x = loadEntity(clazz, Long.parseLong(hit.getId()));
if (x != null) {
LOG.trace("Adding to results: {}", x);
results.add(x);
} else {
LOG.trace("Got null");
}
});
return results;
}
@Override
public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Pageable page) throws SearchException {
return findAll(clazz, toEsQuery(clazz, filter, null), page, null);
}
@Override
public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page) throws SearchException {
return findAll(clazz, toEsQuery(clazz, filter, predicate), page, null);
}
@Override
public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page,
IEntityLoader<T> entityLoader, String... boostFields) throws SearchException {
return findAll(clazz, toEsQuery(clazz, filter, predicate), page, entityLoader);
}
@Override
public Number[][] getAccessionGeoBounds(EmptyModelFilter<?, ?> filter) throws SearchException {
String indexName = toIndexName(Accession.class) + INDEX_READ;
QueryBuilder queryBuilder = toEsQuery(Accession.class, filter);
SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(Accession.class, queryBuilder);
var maxLatitude = AggregationBuilders.max("max_latitude").field("latitude");
var minLatitude = AggregationBuilders.min("min_latitude").field("latitude");
var maxLongitude = AggregationBuilders.max("max_longitude").field("longitude");
var minLongitude = AggregationBuilders.min("min_longitude").field("longitude");
sourceBuilder.aggregation(maxLatitude);
sourceBuilder.aggregation(minLatitude);
sourceBuilder.aggregation(maxLongitude);
sourceBuilder.aggregation(minLongitude);
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);
Max maxLatitudeResult = response.getAggregations().get("max_latitude");
Min minLatitudeResult = response.getAggregations().get("min_latitude");
Max maxLongitudeResult = response.getAggregations().get("max_longitude");
Min minLongitudeResult = response.getAggregations().get("min_longitude");
return new Number[][] { new Number[] { minLatitudeResult.getValue(), maxLongitudeResult.getValue() }, new Number[] { maxLatitudeResult.getValue(), minLongitudeResult.getValue() } };
}
@Override
public Map<String, List<? extends EmptyModel>> fullTextSearch(String text) throws SearchException {
final String TOP_HITS_AGG = "top_five_hits";
final String TERMS_AGG = "index_names";
final String allIndexNames = indexedEntities.stream().map(c -> toIndexName(c) + INDEX_READ).collect(Collectors.joining(","));
TermsAggregationBuilder agg = AggregationBuilders.terms(TERMS_AGG).field("_index").size(indexedEntities.size())
.subAggregation(AggregationBuilders.topHits(TOP_HITS_AGG).sort("_score", SortOrder.DESC).size(10));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(getFullText(text));
sourceBuilder.sort("_score", SortOrder.DESC).aggregation(agg);
SearchRequest searchRequest = new SearchRequest(allIndexNames).types(COMMON_TYPE_NAME).source(sourceBuilder);
SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);
ParsedStringTerms aggregation = response.getAggregations().get(TERMS_AGG);
List<? extends Bucket> buckets = aggregation.getBuckets();
CaseInsensitiveMap<String, Class<EmptyModel>> caseInsensitiveMap = new CaseInsensitiveMap<>(namesToClasses);
Map<String, List<? extends EmptyModel>> result = new LinkedHashMap<>(); // save order
for (Bucket bucket : buckets) {
TopHits topHitsAgg = bucket.getAggregations().get(TOP_HITS_AGG);
SearchHits hits = topHitsAgg.getHits();
String className = bucket.getKey().toString().split("[0-9]")[0];
Class<EmptyModel> clazz = caseInsensitiveMap.get(className);
List<EmptyModel> content = new LinkedList<>();
for (SearchHit hit : hits.getHits()) {
LOG.debug("Mapping {} id={} score={}", clazz.getSimpleName(), hit.getId(), hit.getScore());
EmptyModel x = loadEntity(clazz, Long.parseLong(hit.getId()));
if (x != null) {
LOG.trace("Adding to results: {}", x);
content.add(x);
} else {
LOG.trace("Got null");
}
}
List<EmptyModel> filteredContent = content.stream().filter(record -> {
if (record instanceof Publishable) {
return ((Publishable) record).isPublished();
}
if (record instanceof AclAwareModel) {
return SecurityContextUtil.hasPermission(record, BasePermission.READ);
}
return true;
}).limit(5).collect(Collectors.toList());
if (!filteredContent.isEmpty()) {
result.put(clazz.getSimpleName(), filteredContent);
}
}
return result;
}
private <T extends EmptyModel> Page<T> findAll(Class<T> clazz, QueryBuilder query, Pageable page, ElasticsearchService.IEntityLoader<T> entityLoader) throws SearchException {
String indexName = toIndexName(clazz) + INDEX_READ;
if (page.getOffset() > 100000) {
LOG.warn("Large offset={} for ES query", page.getOffset());
}
SearchSourceBuilder sourceBuilder = makeSearchSourceBuilder(clazz, query);
sourceBuilder.from((int) page.getOffset()).size(page.getPageSize());
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
LOG.debug("findAll ES query {}", searchRequest);
SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);
List<T> content = new LinkedList<>();
if (entityLoader != null) {
// Use custom entity loader
content.addAll(entityLoader.loadEntities(StreamSupport.stream(response.getHits().spliterator(), false).map(searchHit -> {
if (LOG.isTraceEnabled()) {
LOG.trace("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());
}
return Long.parseLong(searchHit.getId());
}).collect(Collectors.toList())));
} else {
// Use simple entityManager loader
response.getHits().forEach(hit -> {
LOG.debug("Mapping {} id={} score={}", clazz, hit.getId(), hit.getScore());
T x = loadEntity(clazz, Long.parseLong(hit.getId()));
if (x != null) {
LOG.trace("Adding to results: {}", x);
content.add(x);
} else {
LOG.trace("Got null");
}
});
}
return new PageImpl<>(content, page, response.getHits().getTotalHits());
}
private SearchSourceBuilder makeSearchSourceBuilder(Class<? extends EmptyModel> clazz, QueryBuilder query) throws SearchException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(query);
sourceBuilder.sort("_score", SortOrder.DESC);
// String indexName = toIndexName(clazz) + INDEX_READ;
// SearchRequest esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
// SearchResponse response = search(esRequest, RequestOptions.DEFAULT);
// float maxScore = response.getHits().getMaxScore();
// float minScore = maxScore * 0.5f;
// sourceBuilder = new SearchSourceBuilder().query(query);
// sourceBuilder.sort("_score", SortOrder.DESC);
// sourceBuilder.minScore(minScore);
return sourceBuilder;
}
@Override
public <T extends EmptyModel> void process(Class<T> clazz, EmptyModelFilter<?, ?> filter, IBatchAction<T> action, Long maxSize) throws Exception, SearchException {
processById(clazz, filter, (ids) -> {
List<T> loadedEntities = ids.stream().map(id -> loadEntity(clazz, id)).filter(entity -> entity != null).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(loadedEntities)) {
action.apply(loadedEntities);
}
}, maxSize == null ? Pageable.unpaged() : Pageable.ofSize(maxSize.intValue()));
}
@Override
public <T extends EmptyModel> void processById(Class<T> clazz, EmptyModelFilter<?, ?> filter, IBatchAction<Long> action, @NonNull Pageable page) throws Exception, SearchException {
String indexName = toIndexName(clazz) + INDEX_READ;
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(toEsQuery(clazz, filter));
sourceBuilder.sort(SortBuilders.fieldSort("id").order(SortOrder.ASC));
// This doesn't work: sourceBuilder.from((int) page.getOffset()).size(page.getPageSize());
// Result window is too large, from + size must be less than or equal to: [10000]
// Need to use scrolling.
sourceBuilder.size(10000);
TimeValue scrollTimeout = TimeValue.timeValueSeconds(30);
SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder).scroll(scrollTimeout);
SearchResponse searchResponse = search(esQuery, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
LOG.info("Got total {} hits from ES", hits.getTotalHits());
if (page.isPaged() && hits.getTotalHits() < page.getOffset()) {
LOG.warn("ES total hits {} are less than the requested offset {}", hits.getTotalHits(), page.getOffset());
return;
}
AtomicLong counter = new AtomicLong();
List<Long> content = new LinkedList<>(); // Contains requested page of IDs
while (hits.getHits().length > 0) {
LOG.debug("Processing {} hits", hits.getHits().length);
if (page.isPaged() && page.getOffset() > counter.get() + hits.getHits().length) { // Offset is bigger than entire hits window
counter.addAndGet(hits.getHits().length); // Just add the number of hits and don't read them
LOG.trace("Skipped {} for offset {}", counter.get(), page.getOffset());
} else {
// Skip what needs skipping
for (var hit : searchResponse.getHits()) {
if (page.isUnpaged() || counter.incrementAndGet() > page.getOffset()) { // We're in the offset window
if (page.isUnpaged() || counter.get() <= page.getOffset() + page.getPageSize()) { // And we have not yet processed the requested page
content.add(Long.parseLong(hit.getId())); // Add ID to results
} else {
break;
}
}
}
}
if (! content.isEmpty()) {
LOG.debug("Processing {} IDs from ES", content.size());
action.apply(content);
content.clear();
}
if (page.isPaged() && counter.get() >= page.getOffset() + page.getPageSize()) { // We're done processing the requested page
return;
}
// Scroll next
String scrollId = searchResponse.getScrollId();
searchResponse = client.scroll(new SearchScrollRequest(scrollId).scroll(scrollTimeout), RequestOptions.DEFAULT);
hits = searchResponse.getHits();
LOG.debug("Got scroll hits {}", hits.getTotalHits());
}
}
@Override
public List<Double[]> distinctCoordinates(Predicate filt, String _text) throws SearchException {
ElasticQueryBuilder esQb = new ElasticQueryBuilder();
if (filt != null) {
filt.accept(esQb, null);
}
QueryBuilder esQuery = esQb.getQuery();
if (StringUtils.isNotBlank(_text)) {
BoolQueryBuilder fulltext = getFullText(_text, AccessionFilter.ES_BOOSTED_FIELDS);
if (esQuery instanceof BoolQueryBuilder) {
esQuery = ((BoolQueryBuilder) esQuery).must(fulltext);
} else {
BoolQueryBuilder builder = new BoolQueryBuilder();
esQuery = builder.filter(esQuery).must(fulltext);
}
}
TimeValue scrollTimeout = TimeValue.timeValueSeconds(30);
String indexName = toIndexName(Accession.class) + INDEX_READ;
SearchSourceBuilder searchSourceBuilder = makeSearchSourceBuilder(Accession.class, esQuery);
searchSourceBuilder
.storedField(ACCESSION_GEO_LATITUDE)
.storedField(ACCESSION_GEO_LONGITUDE) // We only want lat+lon
.sort(SortBuilders.fieldSort("_doc"))
.size(1000); // 10000 is the max allowed by ES
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(searchSourceBuilder).scroll(scrollTimeout);
SearchResponse searchResponse = search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
// System.err.println("Got hits " + hits.getTotalHits());
Set<Point> coordinates = Sets.newHashSet();
while (hits.getHits().length > 0) {
LOG.trace("Processing {} hits ", hits.getHits().length);
hits.forEach(hit -> {
Map<String, DocumentField> fields = hit.getFields();
// fields.values().forEach(hitField -> System.err.print(" " + hitField.getValue()));
// System.err.println();
coordinates.add(new Point(
((Number) fields.get(ACCESSION_GEO_LONGITUDE).getValue()).doubleValue(),
((Number) fields.get(ACCESSION_GEO_LATITUDE).getValue()).doubleValue()
));
});
String scrollId = searchResponse.getScrollId();
try {
searchResponse = client.scroll(new SearchScrollRequest(scrollId).scroll(scrollTimeout), RequestOptions.DEFAULT);
} catch (IOException e) {
LOG.warn("Error occurred during scroll search", e);
throw new SearchException(e.getMessage(), e.getCause());
}
hits = searchResponse.getHits();
LOG.trace("Got {} scroll hits with scrollId={}", hits.getTotalHits(), scrollId);
}
// LOG.debug("Done scrolling");
return coordinates.stream().map(point -> new Double[] { point.getX(), point.getY() }).collect(Collectors.toList());
}
private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters) {
return toEsQuery(clazz, filters, null);
}
private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters, Predicate predicate) {
ElasticQueryBuilder esQb = new ElasticQueryBuilder();
if (filters != null) {
BooleanBuilder builder = new BooleanBuilder();
if (predicate != null) {
builder.and(predicate);
}
builder.and(filters.buildPredicate()).accept(esQb, clazz);
}
QueryBuilder esQuery = esQb.getQuery();
if (filters instanceof IFullTextFilter) {
String text = ((IFullTextFilter) filters).get_text();
if (StringUtils.isNotBlank(text)) {
BoolQueryBuilder fulltext = getFullText(text, filters.boostedFields());
if (esQuery instanceof BoolQueryBuilder) {
esQuery = ((BoolQueryBuilder) esQuery).must(fulltext);
} else {
BoolQueryBuilder builder = new BoolQueryBuilder();
esQuery = builder.filter(esQuery).must(fulltext);
}
}
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Converted {} to\ncurl -XGET 'localhost:???/accession_read/_search?pretty' -H 'Content-type: application/json' -d '\n{ \"query\": \n{} }\n'", new ObjectMapper().writeValueAsString(filters), esQuery);
}
} catch (JsonProcessingException e) {
}
return esQuery;
}
private BoolQueryBuilder getFullText(String text, String... boostFields) {
var charsToEscape = RESERVED_CHARACTERS.stream().filter(text::contains).collect(Collectors.toSet());
for (String charToEscape : charsToEscape) {
text = text.replace(charToEscape, "\\" + charToEscape);
}
BoolQueryBuilder fulltext = boolQuery()
/*@formatter:off*/
.should(multiMatchQuery(text, "_texts")
.fuzziness(Fuzziness.AUTO)
// .minimumShouldMatch("75%")
.boost(2.0f)
)
.should(queryStringQuery(text)
.lenient(true)
.tieBreaker(1.0f) // was .useDisMaX()
.fuzziness(Fuzziness.AUTO)
.boost(0.9f)
);
/*@formatter:on*/
{
SimpleQueryStringBuilder sqsQuery = simpleQueryStringQuery(text)
/*@formatter:off*/
.lenient(true)
.defaultOperator(Operator.AND)
.field("*")
.boost(1.0f)
// .minimumShouldMatch("75%")
/*@formatter:on*/
;
sqsQuery.field("_texts", 2);
if (boostFields != null && boostFields.length > 0) {
for (String field : boostFields) {
sqsQuery.field(field, 3);
}
}
fulltext.should(sqsQuery);
}
fulltext.minimumShouldMatch("70%");
return fulltext;
}
private SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws SearchException {
try {
return client.search(searchRequest, options);
} catch (Throwable e) {
LOG.error("Error occurred during search", e);
throw new SearchException(e.getMessage(), e.getCause());
}
}
}