FirehoseJPAListener.java
/*
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.component.firehose;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Streams;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.server.component.firehose.FirehoseEvent.EventType;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This listener is attached to save and remove methods of JPA repositories.
*
* It determines if records are inserted, updated or deleted and prepares and
* fires a Message (for each save/delete call) for the FirehoseEventListener.
*
* The message contains the timestamp, type and a list of object references
* (List<Ref>). Note: save method may generate two events: one for inserts and
* one for updates.
*
* @author Artem Hrybeniuk
* @author Matija Obreza
*/
@Aspect
@Slf4j
// Instantiated in FirehoseConfig! @Component("firehoseJPAListener")
public class FirehoseJPAListener implements InitializingBean {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
private Set<Class<?>> includedClasses;
private Cache<Class<?>, Boolean> includedCache;
public FirehoseJPAListener() {
System.err.println("Made an instance of FirehoseJPAListener: " + this);
}
@Override
public void afterPropertiesSet() throws Exception {
includedClasses = Set.copyOf(includedClasses); // make it immutable
log.warn("Listening for commits on: {}", includedClasses);
includedCache = Caffeine.newBuilder()
.maximumSize(100)
.build();
includedClasses.forEach(cl -> includedCache.put(cl, true));
}
public void setIncludedClasses(Set<Class<?>> includedClasses) {
this.includedClasses = includedClasses;
}
@Around(value = "(execution(* org.springframework.data.jpa.repository.JpaRepository.save(..)) || execution(* org.springframework.data.repository.*.saveAll(..))) && args(saveArg)")
public Object afterPersist(final ProceedingJoinPoint joinPoint, final Object saveArg) throws Throwable {
if (saveArg == null) {
return joinPoint.proceed();
}
Instant timestamp = Instant.now();
try {
if (saveArg instanceof Iterable) {
if (! isIncludedIterable((Iterable<?>) saveArg)) {
return joinPoint.proceed();
}
log.trace("JPA around persist {}", saveArg);
// Create and send all saved objects separately as FirehoseEvent to the FirehoseEventListener
List<FirehoseEvent> firehoseEventList = createFirehoseEvents((Iterable<?>) saveArg, timestamp, null);
var result = joinPoint.proceed();
List<?> resultList = null;
if (result instanceof List) {
resultList = (List<?>) result;
} else {
// In case it isn't a list
log.warn("We got back a weird collection type {}", result.getClass());
resultList = new ArrayList<Object>((Collection<?>) result);
}
for (int i = 0; i < resultList.size(); i++) {
var updated = resultList.get(i);
if (updated instanceof EmptyModel) {
var firehoseEvent = firehoseEventList.get(i);
if (Objects.isNull(firehoseEvent)) {
continue;
}
firehoseEvent.setId(((EmptyModel) updated).getId());
firehoseEvent.setEntity(updated);
applicationEventPublisher.publishEvent(firehoseEvent);
}
}
return result;
} else if (isIncluded(saveArg)) {
log.trace("JPA around persist {}", saveArg);
EmptyModel model = (EmptyModel) saveArg;
// Create and send saved object as FirehoseEvent to the FirehoseEventListener
FirehoseEvent firehoseEvent = createFirehoseEvent(model, timestamp, null);
var result = joinPoint.proceed();
if (result != null && result instanceof EmptyModel) {
firehoseEvent.setId(((EmptyModel) result).getId());
firehoseEvent.setEntity(result);
}
applicationEventPublisher.publishEvent(firehoseEvent);
return result;
} else {
// Don't bother
return joinPoint.proceed();
}
} catch (Throwable e) {
// LOG.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
throw e;
}
}
/**
* Check if the iterable should be processed.
*
* @param iterable
* @return true if the first element of the iterable is included in Firehose processing
*/
private boolean isIncludedIterable(Iterable<?> iterable) {
var it = iterable.iterator();
return it.hasNext() && isIncluded(it.next());
}
/**
* Is the object included in Firehose processing?
*
* @param object the object
* @return true if the object is included in Firehose processing
*/
private boolean isIncluded(Object object) {
if (object == null) return false;
if (! (object instanceof EmptyModel)) return false;
var objectClass = object.getClass();
return includedCache.get(objectClass, cl -> {
boolean result = false;
for (var i : includedClasses) {
if (i.isAssignableFrom(cl)) {
result = true;
break;
}
}
log.debug("Checking if {} is included: {}", cl, result);
return result;
});
}
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.repository.*.deleteAll(Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(Iterable))")
public void afterRemove(final JoinPoint joinPoint) {
final Object[] args = joinPoint.getArgs();
try {
final Object toRemove = args[0];
if (toRemove == null) {
return;
}
Instant timestamp = Instant.now();
if (toRemove instanceof Iterable<?>) {
if (isIncludedIterable((Iterable<?>) toRemove)) {
log.debug("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
List<FirehoseEvent> firehoseEventList = createFirehoseEvents((Iterable<?>) toRemove, timestamp, EventType.DELETE);
// send events!
firehoseEventList.forEach(applicationEventPublisher::publishEvent);
}
} else if (isIncluded(toRemove)) {
log.debug("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
FirehoseEvent firehoseEvent = createFirehoseEvent((EmptyModel) toRemove, timestamp, EventType.DELETE);
// send event!
applicationEventPublisher.publishEvent(firehoseEvent);
}
} catch (Throwable e) {
// LOG.error(e.getMessage(), e);
throw e;
}
}
/**
* Delete all
*
* @param joinPoint the join point
*/
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll()) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAllInBatch())")
public void afterDeleteAll(final JoinPoint joinPoint) {
try {
log.debug("JPA afterDeleteAll: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
Object proxy = joinPoint.getTarget();
if (proxy instanceof Advised) {
Advised x = (Advised) proxy;
for (Class<?> foo : x.getProxiedInterfaces()) {
for (Type generic : foo.getGenericInterfaces()) {
if (generic instanceof ParameterizedType) {
var typeArg = ((ParameterizedType) generic).getActualTypeArguments()[0];
if (typeArg instanceof Class<?> && includedClasses.contains(typeArg)) {
log.info("Delete all documents for {}", typeArg);
applicationEventPublisher.publishEvent(new FirehoseDeleteAllEvent((Class<?>) typeArg, Instant.now()));
} else {
log.trace("Skipping {}", typeArg);
}
}
}
}
}
} catch (Throwable e) {
// LOG.error(e.getMessage(), e);
throw e;
}
}
/**
* Creates list of FirehoseEvents with type CREATE or UPDATE
*
* @param objects iterable of EmptyModel objects
* @param timestamp the timestamp of JPA event
* @param eventType event type
* @return a {@link List} of objects
*/
private List<FirehoseEvent> createFirehoseEvents(Iterable<?> objects, Instant timestamp, EventType eventType) {
return Streams.stream(objects).map(o -> createFirehoseEvent((EmptyModel) o, timestamp, eventType)).collect(Collectors.toList());
}
/**
* Creates a FirehoseEvent instance
*
* @param <T> the type
* @param object object of type EmptyModel
* @param timestamp the timestamp of JPA event
* @param eventType the event type, can be null
*
* @return an event instance
*/
private <T extends EmptyModel> FirehoseEvent createFirehoseEvent(T object, Instant timestamp, EventType eventType) {
assert(object != null);
if (eventType == EventType.DELETE) {
// Include object for delete events
return new FirehoseEvent(object.getClass(), object.getId(), timestamp, eventType, object);
} else {
// Determine CREATE/UPDATE
return new FirehoseEvent(object.getClass(), object.getId(), timestamp, object.isNew() ? EventType.CREATE : EventType.UPDATE);
}
}
}