KPIServiceImpl.java
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.service.impl;
import static org.genesys.server.model.kpi.QDimensionKey.dimensionKey;
import static org.genesys.server.model.kpi.QExecution.execution;
import static org.genesys.server.model.kpi.QExecutionRun.executionRun;
import static org.genesys.server.model.kpi.QObservation.observation;
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.EntityNotFoundException;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.persistence.TypedQuery;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.server.component.security.SecurityUtils;
import org.genesys.server.exception.DetailedConstraintViolationException;
import org.genesys.server.exception.InvalidApiUsageException;
import org.genesys.server.exception.NotFoundElement;
import org.genesys.server.model.UserRole;
import org.genesys.server.model.kpi.Dimension;
import org.genesys.server.model.kpi.DimensionKey;
import org.genesys.server.model.kpi.Execution;
import org.genesys.server.model.kpi.ExecutionGroup;
import org.genesys.server.model.kpi.ExecutionRun;
import org.genesys.server.model.kpi.JpaDimension;
import org.genesys.server.model.kpi.KPIParameter;
import org.genesys.server.model.kpi.Observation;
import org.genesys.server.model.kpi.QDimensionKey;
import org.genesys.server.model.kpi.QExecutionRun;
import org.genesys.server.model.kpi.QObservation;
import org.genesys.server.persistence.kpi.DimensionKeyRepository;
import org.genesys.server.persistence.kpi.DimensionRepository;
import org.genesys.server.persistence.kpi.ExecutionRepository;
import org.genesys.server.persistence.kpi.ExecutionRunRepository;
import org.genesys.server.persistence.kpi.KPIParameterRepository;
import org.genesys.server.persistence.kpi.ObservationRepository;
import org.genesys.server.service.KPIService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.security.access.prepost.PostAuthorize;
import org.springframework.security.access.prepost.PostFilter;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.acls.domain.BasePermission;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import com.querydsl.core.Tuple;
import com.querydsl.core.types.OrderSpecifier;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
@Service
@Transactional(readOnly = true)
@Validated
public class KPIServiceImpl implements KPIService {
public static final Logger LOG = LoggerFactory.getLogger(KPIServiceImpl.class);
@PersistenceContext
private EntityManager entityManager;
@Autowired
private DimensionRepository dimensionRepository;
@Autowired
private KPIParameterRepository parameterRepository;
@Autowired
private ExecutionRepository executionRepository;
@Autowired
private ObservationRepository observationRepository;
@Autowired
private DimensionKeyRepository dimensionKeyRepository;
@Autowired
private ExecutionRunRepository executionRunRepository;
@Autowired
private JPAQueryFactory jpaQueryFactory;
@Autowired
private SecurityUtils securityUtils;
@Autowired
private Validator validator;
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#parameter, 'ADMINISTRATION')")
@Override
@Transactional
public KPIParameter save(KPIParameter parameter) {
KPIParameter target;
if(parameter.getId() != null) {
target = parameterRepository.findById(parameter.getId()).orElseThrow(() -> new NotFoundElement("Target record not found"));
target.apply(parameter);
} else {
target = parameter;
}
return parameterRepository.save(target);
}
@Override
public KPIParameter getParameter(long id) {
return parameterRepository.findById(id).orElse(null);
}
@Override
public KPIParameter getParameter(String name) {
return parameterRepository.findByName(name);
}
@Override
public Page<KPIParameter> listParameters(Pageable page) {
return parameterRepository.findAll(page);
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#parameter, 'ADMINISTRATION')")
@Override
@Transactional
public KPIParameter delete(KPIParameter parameter) {
parameterRepository.delete(parameter);
parameter.setId(null);
return parameter;
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#dimension, 'ADMINISTRATION')")
@Override
@Transactional
public <T extends Dimension<?>> T save(T dimension) {
T target;
if (dimension.getId() != null) {
LOG.debug("Updating dimension {}", dimension);
target = (T) dimensionRepository.findById(dimension.getId()).orElseThrow(() -> new NotFoundElement("Record not found"));
target.apply(dimension);
} else {
LOG.debug("Persising dimension {}", dimension);
target = dimension;
}
return lazyLoad(dimensionRepository.save(target));
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#dimension, 'ADMINISTRATION')")
@Override
@Transactional
public <T extends Dimension<?>> T delete(T dimension) {
dimensionRepository.delete(dimension);
dimension.setId(null);
return dimension;
}
@Override
public Dimension<?> getDimension(long id) {
Dimension<?> dim = dimensionRepository.findById(id).orElseThrow(() -> new EntityNotFoundException("Record not found by id=" + id));
if (!(dim instanceof JpaDimension))
dim.getValues().size();
return lazyLoad(dim);
}
@Override
public Dimension<?> getDimension(String name) {
Dimension<?> dim = dimensionRepository.findByName(name);
if (!(dim instanceof JpaDimension))
dim.getValues().size();
return lazyLoad(dim);
}
@Override
public Page<Dimension<?>> listDimensions(Pageable page) {
return dimensionRepository.findAll(page);
}
@PostAuthorize("hasRole('ADMINISTRATOR') || returnObject==null || hasPermission(returnObject, 'READ')")
@Override
public Execution getExecution(long id) {
Execution execution = executionRepository.findById(id).orElse(null);
if (execution == null) {
throw new NotFoundElement("Record not found by id=" + id);
}
return execution;
}
@PostAuthorize("hasRole('ADMINISTRATOR') || returnObject==null || hasPermission(returnObject, 'READ')")
@Override
public Execution loadExecution(String executionName) {
return lazyLoad(getExecution(executionName));
}
@PostAuthorize("hasRole('ADMINISTRATOR') || returnObject==null || hasPermission(returnObject, 'READ')")
@Override
public Execution getExecution(String executionName) {
Execution execution = executionRepository.findByName(executionName);
if (execution == null) {
throw new NotFoundElement("Record not found by name=" + executionName);
}
return execution;
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
public ExecutionRun findLastExecutionRun(Execution execution) {
Page<ExecutionRun> l = executionRunRepository.findLast(execution, PageRequest.of(0, 1));
ExecutionRun lastRun = l.getNumberOfElements() == 1 ? l.getContent().get(0) : null;
return lazyLoad(lastRun);
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
public ExecutionRun findExecutionRunByDate(Execution execution, LocalDate date) {
JPAQuery<ExecutionRun> executionQuery = jpaQueryFactory.selectFrom(executionRun);
executionQuery.where(executionRun.execution().eq(execution).and(executionRun.timestamp.before(date.plusDays(1))));
executionQuery.orderBy(executionRun.timestamp.desc(), executionRun.id.desc());
executionQuery.limit(1);
return lazyLoad(executionQuery.fetchFirst());
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'ADMINISTRATION') or hasPermission(#execution, 'WRITE')")
@Override
@Transactional
public Execution save(Execution execution) {
Execution target;
if (execution.getId() != null) {
target = executionRepository.findById(execution.getId()).orElseThrow(() -> new NotFoundElement("No such Execution"));
copyValues(target, execution);
} else {
target = execution;
}
return lazyLoad(executionRepository.save(target));
}
@Override
@PostFilter("hasRole('ADMINISTRATOR') or hasPermission(filterObject, 'READ')")
public List<Execution> listExecutions() {
return executionRepository.findAll();
}
@Override
public Page<Execution> listExecutions(Pageable page) {
if (securityUtils.hasRole(UserRole.ADMINISTRATOR)) {
return executionRepository.findAll(page);
}
Set<Long> executionIds = new HashSet<>(securityUtils.listObjectIdentityIdsForCurrentUser(Execution.class, BasePermission.READ));
JPAQuery<Execution> query = jpaQueryFactory.selectFrom(execution).where(execution.id.in(executionIds));
long total = query.fetchCount();
List<Execution> content = query.offset(page.getOffset()).limit(page.getPageSize()).fetch();
return new PageImpl<>(content, page, total);
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'ADMINISTRATION') or hasPermission(#execution, 'DELETE')")
@Override
@Transactional
public Execution delete(Execution execution) {
// // Exception thrown if there are Observations (which is okay).
// execution = executionRepository.findByName(execution.getName());
executionRunRepository.deleteByExecution(execution);
executionRepository.delete(execution);
// execution.setId(null);
return execution;
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#executionRun.execution, 'ADMINISTRATION') or hasPermission(#executionRun.execution, 'DELETE')")
@Override
@Transactional
public void deleteObservations(ExecutionRun executionRun) {
observationRepository.deleteByExecutionRun(executionRun);
}
@Override
public <T> Set<T> getValues(Dimension<T> dim) {
if (dim instanceof JpaDimension) {
JpaDimension jpaDim = (JpaDimension) dim;
StringBuilder paQuery = new StringBuilder();
paQuery.append("select distinct a.").append(jpaDim.getField()).append(" from ");
paQuery.append(jpaDim.getEntity()).append(" a");
if (StringUtils.isNotBlank(jpaDim.getCondition())) {
paQuery.append(" where ").append(jpaDim.getCondition());
}
LOG.debug(paQuery.toString());
TypedQuery<T> q = entityManager.createQuery(paQuery.toString(), dim.getTargetType());
return new HashSet<T>(q.getResultList());
} else {
// Dimension<?> dim2 = dimensionRepository.findOne(dim.getId());
// dim2.getValues().size();
return dim.getValues();
}
}
private List<Observation> getObservationResults(Execution execution, Query query, Object... params) {
for (int i = 0; i < params.length; i++) {
LOG.debug("\t?{} = {}", (i + 1), params[i]);
query.setParameter(i + 1, params[i]);
}
List<Object> results = query.getResultList();
List<Observation> observations = new ArrayList<>(results.size());
for (Object res : results) {
LOG.trace("Query result: {}", res);
if (res == null) {
continue;
} else if (res instanceof Object[]) {
// case of group by or AVERAGE+STDEV+COUNT or SUM+COUNT
Object[] r = (Object[]) res;
// LOG.trace("OBS: {}", r);
Observation observation = new Observation();
int pos=0;
{
// Query is designed to first return groupBy's
List<ExecutionGroup> groups = execution.getGroups();
for (ExecutionGroup group : groups) {
String groupValue = r[pos] == null ? "" : r[pos].toString();
DimensionKey dk = dimensionKeyRepository.findByNameAndValue(group.toName(), groupValue);
if (dk == null) {
dk = DimensionKey.createFor(group.toName(), groupValue);
}
observation.getDimensions().add(dk);
pos++;
}
}
if (r[pos] != null) {
observation.setValue(((Number) r[pos]).doubleValue());
}
pos++;
if (r.length > pos && r[pos] != null) {
observation.setCount(((Number) r[pos]).longValue());
}
pos++;
if (r.length > pos && r[pos] != null) {
observation.setStdDev(((Number) r[pos]).doubleValue());
}
observations.add(observation);
} else if (res instanceof Number) {
Number r = (Number) res;
Observation observation = new Observation();
observation.setValue(r.doubleValue());
observations.add(observation);
} else {
throw new RuntimeException("Unrecognized return type " + res.getClass() + " for " + res);
}
}
return observations;
}
// readonly mode
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'ADMINISTRATION') or hasPermission(#execution, 'WRITE')")
@Override
public List<Observation> execute(Execution execution) {
List<Observation> results = new ArrayList<Observation>();
execution = executionRepository.findById(execution.getId()).orElseThrow(EntityNotFoundException::new);
Set<ConstraintViolation<?>> violations = new HashSet<>();
violations.addAll(validator.validate(execution));
violations.addAll(validator.validate(execution.getExecutionDimensions()));
violations.addAll(validator.validate(execution.getParameter()));
if (violations == null || violations.isEmpty()) {
LOG.warn("Running KPI Execution query: {}", execution.query());
Query query = entityManager.createQuery(execution.query());
internalExecute(query, execution, results, 0, new ArrayList<Object>());
return results;
} else {
throw new DetailedConstraintViolationException("Execution is no longer valid", violations);
}
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'ADMINISTRATION') or hasPermission(#execution, 'WRITE')")
@Override
@Transactional
public ExecutionRun executeAndSave(Execution execution) {
var stopWatch = StopWatch.createStarted();
ExecutionRun executionRun = new ExecutionRun();
List<Observation> observations = execute(execution);
stopWatch.stop();
executionRun.setExecution(execution);
executionRun.setTimestamp(LocalDate.now());
executionRun.setQueryTime(stopWatch.getTime(TimeUnit.MILLISECONDS));
executionRun = executionRunRepository.save(executionRun);
Map<String, Map<String, DimensionKey>> runCache = new HashMap<String, Map<String, DimensionKey>>();
for (Observation obs : observations) {
for (DimensionKey dk : obs.getDimensions()) {
if (!runCache.containsKey(dk.getName())) {
LOG.info("Loading map of values for {}", dk.getName());
List<Object[]> x = dimensionKeyRepository.loadMap(dk.getName());
Map<String, DimensionKey> bar = new HashMap<String, DimensionKey>();
for (Object[] foo : x) {
bar.put((String) foo[0], (DimensionKey) foo[1]);
}
LOG.info("Map {} size={}", dk.getName(), bar.size());
LOG.info(bar.toString());
runCache.put(dk.getName(), bar);
}
}
}
for (Observation obs : observations) {
List<DimensionKey> dims = new ArrayList<DimensionKey>();
for (DimensionKey dk : obs.getDimensions()) {
try {
if (LOG.isDebugEnabled())
LOG.debug("Finding dk: {}", dk);
DimensionKey existing = runCache.get(dk.getName()).get(dk.getValue());
if (existing == null) {
LOG.info("New dk: {}", dk);
DimensionKey newDk = makeDimensionKey(dk);
runCache.get(dk.getName()).put(dk.getValue(), newDk);
dims.add(newDk);
} else {
dims.add(existing);
}
} catch (Throwable e) {
LOG.error(e.getMessage());
LOG.info("New dk: {}", dk);
dims.add(makeDimensionKey(dk));
}
}
obs.setDimensions(dims);
obs.setExecutionRun(executionRun);
}
// ExecutionRun previousRun = findLastExecutionRun(execution);
// if (previousRun != null) {
// LOG.info("Original observations size: " + observations.size());
// removeSameResult(previousRun, observations);
// LOG.info("Trimmed observations size: " + observations.size());
// }
if (observations.size() == 0) {
throw new RuntimeException("No observations recorded");
}
executionRun.setObservations(observationRepository.saveAll(observations));
return executionRun;
}
// private void removeSameResult(ExecutionRun previousRun, List<Observation>
// observations) {
// List<Observation> previousObservations =
// observationRepository.findByExecutionRun(previousRun);
// LOG.info("Got previous observations size=" +
// previousObservations.size());
// for (int i = observations.size() - 1; i >= 0; i--) {
// if (!observationChanged(observations.get(i), previousObservations)) {
// if (LOG.isDebugEnabled())
// LOG.debug("Removing unchanged observation: " + observations.get(i));
// observations.remove(i);
// }
// }
// }
// private boolean observationChanged(Observation observation,
// List<Observation> observations) {
// Set<DimensionKey> dims = observation.getDimensions();
// Observation previous = findObservationByDimensionKeys(observations,
// dims);
// if (previous == null) {
// // No matching previous observation
// if (LOG.isDebugEnabled())
// LOG.debug("No match found");
// return true;
// }
// if (previous.getValue() != observation.getValue()) {
// // Value is different
// if (LOG.isDebugEnabled())
// LOG.debug("Values are different " + previous + "!=" + observation);
// return true;
// }
// // No change
// return false;
// }
//
// private Observation findObservationByDimensionKeys(List<Observation>
// observations, Set<DimensionKey> dims) {
// for (Observation obs : observations) {
// if (obs.hasDimensionKeys(dims)) {
// return obs;
// }
// }
// return null;
// }
@Transactional
private DimensionKey makeDimensionKey(DimensionKey dk) {
return dimensionKeyRepository.save(dk);
}
private void internalExecute(Query query, Execution execution, List<Observation> results, int depth, List<Object> params) {
Dimension<?> dim = execution.getDimension(depth);
if (dim == null) {
// execute
if (LOG.isDebugEnabled())
LOG.debug("Executing: {} params={}", execution.getName(), params);
List<Observation> res = getObservationResults(execution, query, params.toArray());
res.forEach(observed -> {
registerObservation(observed, execution, params.toArray(), results);
});
} else {
// Recurse
Set<?> values = null;
values = getValues(dim);
for (Object val : values) {
params.add(val);
internalExecute(query, execution, results, depth + 1, params);
params.remove(depth);
}
}
}
private void registerObservation(Observation observed, Execution execution, Object[] conditions, List<Observation> results) {
if (observed == null) {
LOG.debug("Observation is null, skipping");
return;
}
if (observed.getValue() == 0d) {
LOG.debug("Observation value is 0d, skipping");
return;
}
KPIParameter parameter = execution.getParameter();
if (LOG.isDebugEnabled()) {
LOG.debug("Reporting result");
LOG.debug("{}={}", parameter.getName(), observed.getValue());
LOG.debug("Conditions: {}", ArrayUtils.toString(conditions));
}
for (int i = 0; i < conditions.length; i++) {
String name = execution.getExecutionDimensions().get(i).toName();
String value = conditions[i] == null ? "" : conditions[i].toString();
LOG.debug(" dk name={} val={}", name, conditions[i].toString());
DimensionKey dk = dimensionKeyRepository.findByNameAndValue(name, value);
if (dk == null) {
dk = DimensionKey.createFor(name, value);
}
LOG.debug("\t\t{}", dk);
observed.getDimensions().add(i, dk);
}
LOG.debug("OBSERVATION: {}", observed);
results.add(observed);
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#executionRun.execution, 'READ')")
public Page<Observation> listObservations(ExecutionRun executionRun, Map<String, String> dimensionFilters, Pageable page) {
Page<Observation> res = null;
if (dimensionFilters == null || dimensionFilters.isEmpty()) {
LOG.debug("Dimension filters not provided");
res = observationRepository.findByExecutionRun(executionRun, page);
} else {
Set<DimensionKey> dks = getDimensionKeys(dimensionFilters);
LOG.debug("Got {} dimension keys.", dks.size());
res = observationRepository.findObservations(executionRun, dks, page);
}
// Load lazy
fetchDimensions(res.getContent());
return res;
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
public Page<Tuple> listObservations(Execution execution, LocalDate from, LocalDate to, final Map<String, Set<String>> keys, Pageable page) {
final JPAQuery<Tuple> observationQuery = jpaQueryFactory.selectFrom(observation).select(observation.executionRun().timestamp, observation).distinct();
if (!MapUtils.isEmpty(keys)) {
for (var entry : keys.entrySet()) {
observationQuery.join(observation.dimensions, dimensionKey).on(dimensionKey.name.eq(entry.getKey()).and(dimensionKey.value.in(entry.getValue())));
}
}
observationQuery.where(observation.executionRun().execution().id.eq(execution.getId()));
observationQuery.where(observation.executionRun().timestamp.between(from, to));
long total = observationQuery.fetchCount();
if (page.getSort().iterator().hasNext()) {
PathBuilder<Observation> entityPath = new PathBuilder<>(Observation.class, "observation");
page.getSort().forEach(order -> {
// order
PathBuilder<?> path = entityPath.get(order.getProperty());
observationQuery.orderBy(new OrderSpecifier(com.querydsl.core.types.Order.valueOf(order.getDirection().name()), path));
});
} else {
// default sort by newest first
observationQuery.orderBy(QObservation.observation.id.desc());
}
observationQuery.offset(page.getOffset()).limit(page.getPageSize());
List<Tuple> observations = observationQuery.fetch();
var allObservations = observations.stream().map(tuple -> tuple.get(observation)).collect(Collectors.toList());
fetchDimensions(allObservations);
return new PageImpl<>(observations, page, total);
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
public List<Observation> filterObservations(final Execution execution, final LocalDate date, final Map<String, Set<String>> keys) {
JPAQuery<ExecutionRun> executionQuery = jpaQueryFactory.selectFrom(executionRun);
executionQuery.where(executionRun.execution().eq(execution).and(executionRun.timestamp.before(date.plusDays(1))));
executionQuery.orderBy(executionRun.timestamp.desc(), executionRun.id.desc());
executionQuery.limit(1);
ExecutionRun executionRun = executionQuery.fetchFirst();
if (executionRun == null) {
return Collections.emptyList();
}
JPAQuery<Observation> observationQuery = jpaQueryFactory.selectFrom(observation).distinct();
if (!MapUtils.isEmpty(keys)) {
for (var entry : keys.entrySet()) {
observationQuery = observationQuery.join(observation.dimensions, dimensionKey).on(dimensionKey.name.eq(entry.getKey()).and(dimensionKey.value.in(entry.getValue())));
}
}
observationQuery.where(observation.executionRun().id.eq(executionRun.getId()));
List<Observation> observations = observationQuery.fetch();
fetchDimensions(observations);
return observations;
}
private void fetchDimensions(List<Observation> observations) {
var aliasDims = new QDimensionKey("dk");
var observationDimensions = jpaQueryFactory.from(QObservation.observation)
.innerJoin(QObservation.observation.dimensions, aliasDims).fetchAll()
.select(QObservation.observation.id, aliasDims) // Get ID and all its DimensionKeys
.where(QObservation.observation.in(observations)) // Selected observations only
.fetch();
LOG.debug("Fetched {} dimensions for {} observations", observationDimensions.size(), observations.size());
var dimensionKeyMap = new HashMap<Long, List<DimensionKey>>();
for (var obdim : observationDimensions) {
var observationId = obdim.get(0, Long.class);
var dimensionKey = obdim.get(1, DimensionKey.class);
dimensionKeyMap.computeIfAbsent(observationId, k -> new ArrayList<>()).add(dimensionKey); // Add dimensionKey to observationId
}
observations.forEach(obs -> {
obs.setDimensions(dimensionKeyMap.get(obs.getId()));
});
}
private Set<DimensionKey> getDimensionKeys(Map<String, String> dimensionFilters) {
Set<DimensionKey> dks = new HashSet<DimensionKey>();
for (var entry : dimensionFilters.entrySet()) {
String value = entry.getValue();
if (value != null) {
// Get one
DimensionKey dk = dimensionKeyRepository.findByNameAndValue(entry.getKey(), value);
if (dk != null)
dks.add(dk);
}
}
return dks;
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
public Page<ExecutionRun> listExecutionRuns(Execution execution, Pageable pageable) {
return executionRunRepository.findLast(execution, pageable);
}
@Override
@PostAuthorize("hasRole('ADMINISTRATOR') or hasPermission(returnObject.execution, 'READ')")
public ExecutionRun getExecutionRun(long runId) {
return lazyLoad(executionRunRepository.findById(runId).orElse(null));
}
@Override
@Transactional
@PreAuthorize("hasRole('ADMINISTRATOR')")
public ExecutionRun delete(ExecutionRun run) {
executionRunRepository.delete(run);
return run;
}
@Override
@Transactional
@PreAuthorize("hasRole('ADMINISTRATOR')")
public long purgeExecutionRuns(Execution execution) {
Pageable pageable = PageRequest.of(0, 50);
LOG.info("Cleaning runs for id={} {}", execution.getId(), execution.getName());
LOG.debug("Loading runs p={} s={}", pageable.getPageNumber(), pageable.getPageSize());
Page<ExecutionRun> runs = executionRunRepository.findLast(execution, pageable);
ExecutionRun previousRun = null;
List<ExecutionRun> toDelete = new ArrayList<>();
while (runs.hasContent() && toDelete.size() < 20) {
for (ExecutionRun run : runs) {
if (previousRun == null) {
previousRun = run;
continue;
}
// We have a run to compare against
boolean keepThisRun = false;
ZonedDateTime runLocalDate = run.getTimestamp().atStartOfDay(ZoneOffset.systemDefault());
ZonedDateTime previousRunLocalDate = previousRun.getTimestamp().atStartOfDay(ZoneOffset.systemDefault());
ZonedDateTime nowDateTime = ZonedDateTime.now();
Duration periodToNow = Duration.between(runLocalDate, nowDateTime).abs();
LOG.trace("Inspecting run id={} date={} against previous={} on={}", run.getId(), run.getTimestamp(), previousRun.getId(), previousRun.getTimestamp());
if (periodToNow.toDays() < 10) {
// Younger than 10 days
keepThisRun = true;
LOG.debug("Keeping run younger than 10 days id={} timestamp={}", run.getId(), runLocalDate);
} else if (Math.abs(run.getTotalValue() / previousRun.getTotalValue()) > 1.10) {
// Keep because totals are 10% off
keepThisRun = true;
LOG.debug("Keeping run because totalValue diff>{} id={} timestamp={}", Math.abs(run.getTotalValue() / previousRun.getTotalValue()), run.getId(), runLocalDate);
} else if (periodToNow.toDays() < 100 && runLocalDate.getDayOfWeek() == DayOfWeek.SUNDAY) {
// Keep Sundays
keepThisRun = true;
LOG.debug("Keeping run on Sunday id={} timestamp={}", run.getId(), runLocalDate);
} else if (runLocalDate.getDayOfMonth() == 1) {
// Keep every 1st of the Month
keepThisRun = true;
LOG.debug("Keeping run on 1st of Month id={} timestamp={}", run.getId(), runLocalDate);
} else if (Duration.between(runLocalDate, previousRunLocalDate).abs().toDays() > 31) {
// Keep if gap to previous kept run is more than 31 days
keepThisRun = true;
LOG.debug("Keeping run with a big gap id={} timestamp={} days={}", run.getId(), runLocalDate, Duration.between(runLocalDate, previousRunLocalDate).abs().toDays());
} else if (! runs.hasNext() && runs.getContent().get(runs.getNumberOfElements() - 1).equals(run)) {
// It's the first ever run!Keep every 1st of the Month
keepThisRun = true;
LOG.debug("Keeping first ever run id={} timestamp={}", run.getId(), runLocalDate);
} else {
// Don't keep
LOG.debug("Meh run id={} timestamp={}", run.getId(), runLocalDate);
}
if (!keepThisRun) {
LOG.info("Removing run id={} timestamp={}", run.getId(), runLocalDate);
toDelete.add(run);
if (toDelete.size() >= 20) {
LOG.debug("Many runs to delete!");
break;
}
} else {
// KEEP run!
previousRun = run;
}
}
// Load next page
pageable = pageable.next();
LOG.debug("Loading runs p={} s={}", pageable.getPageNumber(), pageable.getPageSize());
runs = executionRunRepository.findLast(execution, pageable);
}
LOG.info("Deleting {} runs", toDelete.size());
toDelete.forEach(deleteRun -> executionRunRepository.delete(deleteRun));
return (long) toDelete.size();
}
private void copyValues(Execution target, Execution source) {
if (source.getExecutionDimensions() != null) {
target.getExecutionDimensions().clear();
target.getExecutionDimensions().addAll(source.getExecutionDimensions().stream().filter(executionDimension -> !target.getExecutionDimensions().contains(executionDimension)).collect(Collectors.toList()));
}
if (source.getGroups() != null) {
target.getGroups().clear();
target.getGroups().addAll(source.getGroups());
}
target.setActive(source.isActive());
target.setLink(source.getLink());
target.setProperty(source.getProperty());
target.setParameter(source.getParameter());
target.setTitle(source.getTitle());
target.setType(source.getType());
target.setDescription(source.getDescription());
}
private ExecutionRun lazyLoad(ExecutionRun run) {
if (run != null) {
fetchDimensions(run.getObservations());
}
return run;
}
private Execution lazyLoad(Execution execution) {
if (execution != null) {
if (execution.getExecutionDimensions() != null) {
execution.getExecutionDimensions().forEach((ed) -> {
lazyLoad(ed.getDimension());
});
}
if (execution.getRuns() != null) {
execution.getRuns().size();
}
execution.getGroups().size();
}
return execution;
}
private <T extends Dimension<?>> T lazyLoad(T dimension) {
if (!(dimension instanceof JpaDimension) && dimension.getValues() != null) {
dimension.getValues().size();
}
return dimension;
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
@Override
public SortedMap<LocalDate, List<Observation>> calculateRunDiff(Execution execution, LocalDate from, LocalDate to, Map<String, Set<String>> keys) {
if (execution == null) {
throw new InvalidApiUsageException("Execution must be provided.");
}
if (from == null || to == null) {
throw new InvalidApiUsageException("From and to dates must be provided.");
}
if (from.isAfter(to)) {
throw new InvalidApiUsageException("From date must be before to date.");
}
LOG.info("Generating run diff from {} to {}", from, to);
if (keys == null || keys.keySet().size() == 0)
return calculateRunDiff(execution, from, to);
SortedMap<LocalDate, List<Observation>> diffs = new TreeMap<>();
List<Observation> currentObservations = filterObservations(execution, to, keys);
LocalDate date = to;
do {
date = date.minus(1, ChronoUnit.DAYS);
List<Observation> earlierObservarions = filterObservations(execution, date, keys);
List<Observation> diff = new LinkedList<>();
for (Observation observation : currentObservations) {
Observation earlierObservation = earlierObservarions.stream().filter(o -> o.hasDimensionKeys(observation.getDimensions())).findFirst().orElse(null);
if (earlierObservation == null) {
LOG.debug("No earlier observation for {}", observation.getDimensions());
diff.add(observation);
} else {
LOG.debug("Have earlier observation for {}", observation.getDimensions());
if (observation.getValue() != earlierObservation.getValue()) {
diff.add(new Observation(observation.getDimensions(), observation.getValue() - earlierObservation.getValue()));
}
}
}
for (Observation earlierObservation: earlierObservarions) {
Observation observation = currentObservations.stream().filter(o -> o.hasDimensionKeys(earlierObservation.getDimensions())).findFirst().orElse(null);
if (observation == null) {
LOG.debug("Observation removed for {}", earlierObservation.getDimensions());
diff.add(new Observation(earlierObservation.getDimensions(), -earlierObservation.getValue()));
}
}
if (! diff.isEmpty()) {
diffs.put(currentObservations.get(0).getExecutionRun().getTimestamp(), diff);
}
currentObservations = earlierObservarions;
} while (date.isAfter(from));
return diffs;
}
private SortedMap<LocalDate, List<Observation>> calculateRunDiff(Execution execution, LocalDate from, LocalDate to) {
SortedMap<LocalDate, List<Observation>> diffs = new TreeMap<>();
var inst = LocalDate.now();
ExecutionRun currentRun = findExecutionRunByDate(execution, inst);
if (currentRun == null) {
return diffs;
}
do {
inst = inst.minus(1, ChronoUnit.DAYS);
ExecutionRun earlierRun = findExecutionRunByDate(execution, inst);
if (earlierRun == null) {
LOG.debug("No earlier run");
break;
}
if (earlierRun.getId().equals(currentRun.getId())) {
LOG.debug("Not comparing to self");
continue;
}
List<Observation> diff = new LinkedList<>();
LOG.info("Comparing {} to {}", currentRun.getTimestamp(), earlierRun.getTimestamp());
// Additions and changes
for (Observation observation : currentRun.getObservations()) {
Observation earlierObservation = earlierRun.getObservations().stream().filter(o -> o.hasDimensionKeys(observation.getDimensions())).findFirst().orElse(null);
if (earlierObservation == null) {
LOG.debug("No earlier observation for {}", observation.getDimensions());
diff.add(observation);
} else {
LOG.debug("Have earlier observation for {}", observation.getDimensions());
if (observation.getValue() != earlierObservation.getValue()) {
diff.add(new Observation(observation.getDimensions(), observation.getValue() - earlierObservation.getValue()));
}
}
}
// Search for removals
for (Observation earlierObservation : earlierRun.getObservations()) {
Observation observation = currentRun.getObservations().stream().filter(o -> o.hasDimensionKeys(earlierObservation.getDimensions())).findFirst().orElse(null);
if (observation == null) {
LOG.debug("Observation removed for {}", earlierObservation.getDimensions());
diff.add(new Observation(earlierObservation.getDimensions(), -earlierObservation.getValue()));
}
}
if (!diff.isEmpty()) {
diffs.put(currentRun.getTimestamp(), diff);
}
currentRun = earlierRun;
} while (inst.isAfter(from));
return diffs;
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
@Override
public SortedMap<LocalDate, List<Observation>> findExecutionRuns(Execution execution, KPIService.ExecutionRunsRequest runsRequest) {
SortedMap<LocalDate, List<Observation>> executionRunObservations = new TreeMap<>();
for (LocalDate date : runsRequest.dates) {
var observations = filterObservations(execution, date, runsRequest.keys);
if (CollectionUtils.isNotEmpty(observations)) {
executionRunObservations.put(observations.get(0).getExecutionRun().getTimestamp(), observations);
}
}
return executionRunObservations;
}
@Override
public List<DimensionKey> getDimensionKeys(Execution execution, Set<String> dimensionNames) {
QExecutionRun qer = QExecutionRun.executionRun;
var aliasObs = new QObservation("o");
var aliasDims = new QDimensionKey("d");
var dimKeysForRuns = jpaQueryFactory.from(qer)
.innerJoin(qer.observations, aliasObs)
.innerJoin(aliasObs.dimensions, aliasDims)
.select(aliasDims).distinct();
var where = qer.execution().eq(execution);
if (! CollectionUtils.isEmpty(dimensionNames)) {
where = where.and(aliasDims.name.in(dimensionNames));
}
return dimKeysForRuns
.where(where)
.orderBy(aliasDims.name.asc(), aliasDims.value.asc())
.fetch();
}
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#execution, 'READ')")
@Override
public List<GroupedRunObservations> getObservationsGroupedByDimension(Execution execution, String dimensionName, LocalDate toDate, Integer maxRuns) {
execution = getExecution(execution.getId());
JPAQuery<ExecutionRun> executionQuery = jpaQueryFactory.selectFrom(executionRun);
executionQuery.where(
executionRun.execution().eq(execution)
.and(executionRun.timestamp.before((toDate != null ? toDate : LocalDate.now()).plusDays(1)))
);
executionQuery.orderBy(executionRun.timestamp.desc(), executionRun.id.desc());
executionQuery.limit(maxRuns != null && maxRuns > 0 ? maxRuns : 10);
var lastRuns = executionQuery.fetch();
Map<String, GroupedRunObservations> dimensionValueGroupMap = new HashMap<>();
for (ExecutionRun run : lastRuns) {
LocalDate runTimestamp = run.getTimestamp();
var aliasDims = new QDimensionKey("dk");
var observationsWithDimensions = jpaQueryFactory.from(QObservation.observation)
.innerJoin(QObservation.observation.dimensions, aliasDims)
.select(aliasDims.name, aliasDims.value, QObservation.observation.value, QObservation.observation.stdDev)
.where(QObservation.observation.executionRun().eq(run).and(aliasDims.name.eq(dimensionName)))
.fetch();
for (var observation : observationsWithDimensions) {
var dimension = observation.get(0, String.class);
assert (dimension.equals(dimensionName));
var dimensionValue = observation.get(1, String.class);
double observedValue = observation.get(2, double.class);
var existedGroup = dimensionValueGroupMap.get(dimensionValue);
if (existedGroup != null) {
var existed = existedGroup.runCountResult.computeIfPresent(runTimestamp, (date, count) -> count + observedValue);
if (existed == null) {
existedGroup.runCountResult.put(runTimestamp, observedValue);
}
} else {
Map<LocalDate, Double> runResultMap = new TreeMap<>();
runResultMap.put(runTimestamp, observedValue);
dimensionValueGroupMap.put(dimensionValue, new GroupedRunObservations(dimensionValue, runResultMap));
}
}
}
return new ArrayList<>(dimensionValueGroupMap.values());
}
}