AccessionRefAspect.java

/*
 * Copyright 2018 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.genesys.server.service.worker;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.lang3.time.StopWatch;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.genesys.server.model.dataset.DatasetAccessionRef;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionRef;
import org.genesys.server.model.impl.SubsetAccessionRef;
import org.genesys.server.service.DatasetService;
import org.genesys.server.service.DiversityTreeService;
import org.genesys.server.service.SubsetService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import com.google.common.collect.Lists;

/**
 * Update {@link AccessionRef} updates when accessions are added or removed from
 * Genesys.
 * 
 * @author Viacheslav Pavlov
 * @author Matija Obreza
 */
@Aspect
@Component
public class AccessionRefAspect {

	/** The Constant LOG. */
	public static final Logger LOG = LoggerFactory.getLogger(AccessionRefAspect.class);

	@Autowired
	private DatasetService datasetService;

	@Autowired
	private SubsetService subsetService;

	@Autowired
	private DiversityTreeService divTreeService;

	@Autowired
	private ApplicationEventPublisher applicationEventPublisher;

	@Around(value = "(execution(* org.genesys.server.persistence.dataset.DatasetAccessionRefRepository.saveAll(*)) || execution(* org.genesys.server.persistence.dataset.DatasetAccessionRefRepository.saveAllAndFlush(*))) && args(saveArg)")
	public Object afterRetDatasetAccessionRefSaveAll(final ProceedingJoinPoint joinPoint, Object saveArg) throws Throwable {
		LOG.trace("afterRetDatasetAccessionRefSaveAll: {}", joinPoint);
		if (saveArg == null) {
			return joinPoint.proceed();
		}
		var arg = saveArg;
		List<DatasetAccessionRef> newRefs = null; // We must only take new accessionRefs, otherwise we'll loop forever
		if (arg instanceof List) {
			newRefs = ((List<DatasetAccessionRef>) arg).stream().filter(ref -> ref.isNew() && ref.getAccession() == null).collect(Collectors.toList());
		}
		Object proceed = joinPoint.proceed();
		if (newRefs != null && newRefs.size() > 0) {
			LOG.warn("Have {} NEW Dataset accession refs that need matching", newRefs.size());
			applicationEventPublisher.publishEvent(new DatasetAccessionRefsUpdateEvent(newRefs));
		}
		return proceed;
	}

	@Around(value = "(execution(* org.genesys.server.persistence.SubsetAccessionRefRepository.saveAll(*)) || execution(* org.genesys.server.persistence.SubsetAccessionRefRepository.saveAllAndFlush(*))) && args(saveArg)")
	public Object afterRetSubsetAccessionRefSaveAll(final ProceedingJoinPoint joinPoint, Object saveArg) throws Throwable {
		LOG.trace("afterRetSubsetAccessionRefSaveAll: {}", joinPoint);
		if (saveArg == null) {
			return joinPoint.proceed();
		}
		var arg = saveArg;
		List<SubsetAccessionRef> newRefs = null; // We must only take new accessionRefs, otherwise we'll loop forever
		if (arg instanceof List) {
			newRefs = ((List<SubsetAccessionRef>) arg).stream().filter(ref -> ref.isNew() && ref.getAccession() == null).collect(Collectors.toList());
		}
		Object proceed = joinPoint.proceed();
		if (newRefs != null && newRefs.size() > 0) {
			LOG.warn("Have {} NEW Subset accession refs that need matching", newRefs.size());
			applicationEventPublisher.publishEvent(new SubsetAccessionRefsUpdateEvent(newRefs));
		}
		return proceed;
	}

	@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
	public void handleDatasetAccessionRefsUpdateEvent(AccessionRefsUpdateEvent<? extends AccessionRef<?>> acceRefUpdates) {
		if (acceRefUpdates.updatedRefs != null) {
			if (acceRefUpdates instanceof DatasetAccessionRefsUpdateEvent) {
				var datasetRefs = (DatasetAccessionRefsUpdateEvent) acceRefUpdates;
				LOG.warn("Rematching {} Dataset accessionRefs after commit", datasetRefs.updatedRefs.size());
				datasetService.batchRematchAccessionRefs(datasetRefs.updatedRefs);
			}
			if (acceRefUpdates instanceof SubsetAccessionRefsUpdateEvent) {
				var subsetRefs = (SubsetAccessionRefsUpdateEvent) acceRefUpdates;
				LOG.warn("Rematching {} Dataset accessionRefs after commit", subsetRefs.updatedRefs.size());
				subsetService.batchRematchAccessionRefs(subsetRefs.updatedRefs);
			}
		}
	}

	static class DatasetAccessionRefsUpdateEvent extends AccessionRefsUpdateEvent<DatasetAccessionRef> {
		public DatasetAccessionRefsUpdateEvent(Collection<DatasetAccessionRef> refs) {
			super(refs);
		}
	}

	static class SubsetAccessionRefsUpdateEvent extends AccessionRefsUpdateEvent<SubsetAccessionRef> {
		public SubsetAccessionRefsUpdateEvent(Collection<SubsetAccessionRef> refs) {
			super(refs);
		}
	}

	static class AccessionRefsUpdateEvent<T extends AccessionRef<?>> {
		List<T> updatedRefs;
		public AccessionRefsUpdateEvent(Collection<T> refs) {
			updatedRefs = refs.stream().filter(ref -> ref.getAccession() == null).collect(Collectors.toList()); // copy list
		}
	}

	@Before(value = "execution(* org.genesys.server.persistence.AccessionRepository.delete(*)) && args(arg)")
	public void beforeDeleteAccession(JoinPoint joinPoint, Object arg) throws Throwable {

		StopWatch stopWatch=StopWatch.createStarted();
		LOG.trace("Before delete accession" + arg);
		if (arg == null) {
			return;
		}
		if (arg instanceof Collection<?>) {
			removeRefs((Collection<Accession>) arg);
			LOG.warn("Dereferencing AccessionRefs for {} accession took {}ms", ((Collection<?>) arg).size(), stopWatch.getTime(TimeUnit.MILLISECONDS));
		} else if (arg instanceof Accession) {
			removeRefs(Lists.newArrayList((Accession) arg));
			LOG.warn("Dereferencing AccessionRefs for 1 accession took {}ms", stopWatch.getTime(TimeUnit.MILLISECONDS));
		}
	}
	
	private void removeRefs(Collection<Accession> accessions) {
		datasetService.clearAccessionRefs(accessions);
		subsetService.clearAccessionRefs(accessions);
		divTreeService.clearAccessionRefs(accessions);
	}

}