FirehoseJPAListener.java

/*
 * Copyright 2021 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.genesys.server.component.firehose;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Streams;

import lombok.extern.slf4j.Slf4j;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.server.component.firehose.FirehoseEvent.EventType;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * This listener is attached to save and remove methods of JPA repositories.
 * 
 * It determines if records are inserted, updated or deleted and prepares and
 * fires a Message (for each save/delete call) for the FirehoseEventListener.
 * 
 * The message contains the timestamp, type and a list of object references
 * (List<Ref>). Note: save method may generate two events: one for inserts and
 * one for updates.
 *
 * @author Artem Hrybeniuk
 * @author Matija Obreza
 */
@Aspect
@Slf4j
// Instantiated in FirehoseConfig! @Component("firehoseJPAListener")
public class FirehoseJPAListener implements InitializingBean {

	@Autowired
	private ApplicationEventPublisher applicationEventPublisher;

	private Set<Class<?>> includedClasses;

	private Cache<Class<?>, Boolean> includedCache;

	public FirehoseJPAListener() {
		System.err.println("Made an instance of FirehoseJPAListener: " + this);
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		includedClasses = Set.copyOf(includedClasses); // make it immutable
		log.warn("Listening for commits on: {}", includedClasses);

		includedCache = Caffeine.newBuilder()
			.maximumSize(100)
			.build();

		includedClasses.forEach(cl -> includedCache.put(cl, true));
	}

	public void setIncludedClasses(Set<Class<?>> includedClasses) {
		this.includedClasses = includedClasses;
	}

	@Around(value = "(execution(* org.springframework.data.jpa.repository.JpaRepository.save(..)) || execution(* org.springframework.data.repository.*.saveAll(..))) && args(saveArg)")
	public Object afterPersist(final ProceedingJoinPoint joinPoint, final Object saveArg) throws Throwable {
		if (saveArg == null) {
			return joinPoint.proceed();
		}

		Instant timestamp = Instant.now();
		try {
			if (saveArg instanceof Iterable) {
				if (! isIncludedIterable((Iterable<?>) saveArg)) {
					return joinPoint.proceed();
				}
				log.trace("JPA around persist {}", saveArg);

				// Create and send all saved objects separately as FirehoseEvent to the FirehoseEventListener
				List<FirehoseEvent> firehoseEventList = createFirehoseEvents((Iterable<?>) saveArg, timestamp, null);
				var result = joinPoint.proceed();
				List<?> resultList = null;
				if (result instanceof List) {
					resultList = (List<?>) result;
				} else {
					// In case it isn't a list
					log.warn("We got back a weird collection type {}", result.getClass());
					resultList = new ArrayList<Object>((Collection<?>) result);
				}
				for (int i = 0; i < resultList.size(); i++) {
					var updated = resultList.get(i);
					if (updated instanceof EmptyModel) {
						var firehoseEvent = firehoseEventList.get(i);
						if (Objects.isNull(firehoseEvent)) {
							continue;
						}
						firehoseEvent.setId(((EmptyModel) updated).getId());
						firehoseEvent.setEntity(updated);
						applicationEventPublisher.publishEvent(firehoseEvent);
					}
				}
				return result;

			} else if (isIncluded(saveArg)) {

				log.trace("JPA around persist {}", saveArg);

				EmptyModel model = (EmptyModel) saveArg;
				// Create and send saved object as FirehoseEvent to the FirehoseEventListener
				FirehoseEvent firehoseEvent = createFirehoseEvent(model, timestamp, null);
				var result = joinPoint.proceed();
				if (result != null && result instanceof EmptyModel) {
					firehoseEvent.setId(((EmptyModel) result).getId());
					firehoseEvent.setEntity(result);
				}
				applicationEventPublisher.publishEvent(firehoseEvent);
				return result;

			} else {
				// Don't bother
				return joinPoint.proceed();
			}
		} catch (Throwable e) {
//			LOG.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
			throw e;
		}
	}

	/**
	 * Check if the iterable should be processed.
	 * 
	 * @param iterable
	 * @return true if the first element of the iterable is included in Firehose processing
	 */
	private boolean isIncludedIterable(Iterable<?> iterable) {
		var it = iterable.iterator();
		return it.hasNext() && isIncluded(it.next());
	}

	/**
	 * Is the object included in Firehose processing?
	 * 
	 * @param object the object
	 * @return true if the object is included in Firehose processing
	 */
	private boolean isIncluded(Object object) {
		if (object == null) return false;
		if (! (object instanceof EmptyModel)) return false;

		var objectClass = object.getClass();
		return includedCache.get(objectClass, cl -> {
			boolean result = false;
			for (var i : includedClasses) {
				if (i.isAssignableFrom(cl)) {
					result = true;
					break;
				}
			}
			log.debug("Checking if {} is included: {}", cl, result);
			return result;
		});
	}

	@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.repository.*.deleteAll(Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(Iterable))")
	public void afterRemove(final JoinPoint joinPoint) {
		final Object[] args = joinPoint.getArgs();
		try {
			final Object toRemove = args[0];
			if (toRemove == null) {
				return;
			}
			Instant timestamp = Instant.now();

			if (toRemove instanceof Iterable<?>) {
				if (isIncludedIterable((Iterable<?>) toRemove)) {
					log.debug("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
					List<FirehoseEvent> firehoseEventList = createFirehoseEvents((Iterable<?>) toRemove, timestamp, EventType.DELETE);
					// send events!
					firehoseEventList.forEach(applicationEventPublisher::publishEvent);
				}

			} else if (isIncluded(toRemove)) {

				log.debug("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
				FirehoseEvent firehoseEvent = createFirehoseEvent((EmptyModel) toRemove, timestamp, EventType.DELETE);
				// send event!
				applicationEventPublisher.publishEvent(firehoseEvent);
			}

		} catch (Throwable e) {
//			LOG.error(e.getMessage(), e);
			throw e;
		}
	}


	/**
	 * Delete all
	 *
	 * @param joinPoint the join point
	 */
	@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll()) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAllInBatch())")
	public void afterDeleteAll(final JoinPoint joinPoint) {
		try {
			log.debug("JPA afterDeleteAll: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
			Object proxy = joinPoint.getTarget();
			if (proxy instanceof Advised) {
				Advised x = (Advised) proxy;
				for (Class<?> foo : x.getProxiedInterfaces()) {
					for (Type generic : foo.getGenericInterfaces()) {
						if (generic instanceof ParameterizedType) {
							var typeArg = ((ParameterizedType) generic).getActualTypeArguments()[0];
							if (typeArg instanceof Class<?> && includedClasses.contains(typeArg)) {
								log.info("Delete all documents for {}", typeArg);
								applicationEventPublisher.publishEvent(new FirehoseDeleteAllEvent((Class<?>) typeArg, Instant.now()));
							} else {
								log.trace("Skipping {}", typeArg);
							}
						}
					}
				}
			}
		} catch (Throwable e) {
//			LOG.error(e.getMessage(), e);
			throw e;
		}
	}

	
	/**
	 * Creates list of FirehoseEvents with type CREATE or UPDATE
	 * 
	 * @param objects iterable of EmptyModel objects
	 * @param timestamp the timestamp of JPA event
	 * @param eventType event type
	 * @return a {@link List} of objects
	 */
	private List<FirehoseEvent> createFirehoseEvents(Iterable<?> objects, Instant timestamp, EventType eventType) {
		return Streams.stream(objects).map(o -> createFirehoseEvent((EmptyModel) o, timestamp, eventType)).collect(Collectors.toList());
	}

	/**
	 * Creates a FirehoseEvent instance
	 * 
	 * @param <T> the type
	 * @param object object of type EmptyModel
	 * @param timestamp the timestamp of JPA event
	 * @param eventType the event type, can be null
	 * 
	 * @return an event instance
	 */
	private <T extends EmptyModel> FirehoseEvent createFirehoseEvent(T object, Instant timestamp, EventType eventType) {
		assert(object != null);
		if (eventType == EventType.DELETE) {
			// Include object for delete events
			return new FirehoseEvent(object.getClass(), object.getId(), timestamp, eventType, object);
		} else {
			// Determine CREATE/UPDATE
			return new FirehoseEvent(object.getClass(), object.getId(), timestamp, object.isNew() ? EventType.CREATE : EventType.UPDATE);
		}
	}

}