AccessionLastChangedDateUpdater.java

/*
 * Copyright 2024 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.genesys.server.service.worker;

import com.querydsl.core.Tuple;
import com.querydsl.core.types.ExpressionUtils;
import com.querydsl.core.types.dsl.Expressions;
import com.querydsl.jpa.impl.JPAQueryFactory;

import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.genesys.blocks.auditlog.model.AuditAction;
import org.genesys.blocks.auditlog.model.QAuditLog;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionAlias;
import org.genesys.server.model.genesys.AccessionCollect;
import org.genesys.server.model.genesys.AccessionId;
import org.genesys.server.model.genesys.AccessionRemark;
import org.genesys.server.model.genesys.QAccession;
import org.genesys.server.model.genesys.QAccessionAlias;
import org.genesys.server.model.genesys.QAccessionCollect;
import org.genesys.server.model.genesys.QAccessionRemark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Set;

@Component
@Slf4j
public class AccessionLastChangedDateUpdater {

	@Autowired
	private JPAQueryFactory jpaQueryFactory;

	@Autowired
	private AccessionProcessor accessionProcessor;
	
	@Scheduled(initialDelay = 1, fixedDelay = 2, timeUnit = java.util.concurrent.TimeUnit.HOURS) // Every two hours
	@SchedulerLock(name = "org.genesys.server.service.worker.AccessionLastChangedDateUpdater", lockAtLeastFor = "PT1H", lockAtMostFor = "PT1H15M") // Keep locked for 1 hour so it doesn't execute on another node
	@Transactional
	public void runLastChangedDateUpdater() {

		Instant inst = Instant.now().minus(3, ChronoUnit.HOURS); // Scan last three hours of audit logs

		var auditLogPath = QAuditLog.auditLog;

		var updatedAccessions = new HashMap<Long, Instant>();

		addAllAccessionUpdates(updatedAccessions, jpaQueryFactory.selectDistinct(auditLogPath.entityId, auditLogPath.logDate).from(auditLogPath)
			.where(
				auditLogPath.logDate.after(inst)
					.and(auditLogPath.action.eq(AuditAction.UPDATE))
					.and(Expressions.anyOf(
						// Accession and relevant properties
						auditLogPath.classPk.classname.eq(Accession.class.getName()).and(auditLogPath.propertyName.in(Set.of(
							"doi", "instituteCode", "genus", "institute", "accessionNumber", "cropName",
							"taxonomy", "acquisitionSource", "acquisitionDate", "origCty", "sampStat", "historic",
							"mlsStatus", "acceUrl", "accNames", "otherIds",
							"donorCode", "donorName", "donorNumb", "ancest", "curationType"
						))),
						// AccessionId and relevant properties
						auditLogPath.classPk.classname.eq(AccessionId.class.getName()).and(auditLogPath.propertyName.in(Set.of(
							"storage", "duplSite", "coll", "aliases", "remarks", "breederCode", "breederName"
						)))
					))
			)
			.fetch()
		);

		// AccessionCollect and all its properties
		addAllAccessionUpdates(updatedAccessions, jpaQueryFactory
			.select(QAccessionCollect.accessionCollect.accession().id, auditLogPath.logDate)
			.from(QAccessionCollect.accessionCollect)
			.innerJoin(auditLogPath).on(auditLogPath.entityId.eq(QAccessionCollect.accessionCollect.id))
			.where(
				auditLogPath.logDate.after(inst)
					.and(auditLogPath.action.eq(AuditAction.UPDATE))
					.and(auditLogPath.classPk.classname.eq(AccessionCollect.class.getName()))
			).fetch()
		);

		// AccessionRemark and all its properties
		addAllAccessionUpdates(updatedAccessions, jpaQueryFactory
			.select(QAccessionRemark.accessionRemark.accession().id, auditLogPath.logDate)
			.from(QAccessionRemark.accessionRemark)
			.innerJoin(auditLogPath).on(auditLogPath.entityId.eq(QAccessionRemark.accessionRemark.id))
			.where(
				auditLogPath.logDate.after(inst)
					.and(auditLogPath.action.eq(AuditAction.UPDATE))
					.and(auditLogPath.classPk.classname.eq(AccessionRemark.class.getName()))
			).fetch()
		);

		// AccessionAlias and all its properties
		addAllAccessionUpdates(updatedAccessions, jpaQueryFactory
			.select(QAccessionAlias.accessionAlias.accession().id, auditLogPath.logDate)
			.from(QAccessionAlias.accessionAlias)
			.innerJoin(auditLogPath).on(auditLogPath.entityId.eq(QAccessionAlias.accessionAlias.id))
			.where(
				auditLogPath.logDate.after(inst)
					.and(auditLogPath.action.eq(AuditAction.UPDATE))
					.and(auditLogPath.classPk.classname.eq(AccessionAlias.class.getName()))
			).fetch()
		);

		log.info("Found {} changes to accession passport data", updatedAccessions.size());

		accessionProcessor.apply(
			ExpressionUtils.anyOf(
				QAccession.accession.createdDate.after(inst),
				QAccession.accession.id.in(updatedAccessions.keySet())
			),
			(accessions) -> {
				log.info("Updating lastChangedDate for {} accessions", accessions.size());
				accessions.forEach(accession -> {
					var d = updatedAccessions.getOrDefault(accession.getId(), accession.getCreatedDate());
					jpaQueryFactory.update(QAccession.accession)
						.set(QAccession.accession.lastChangedDate, d)
						.where(QAccession.accession.id.eq(accession.getId())
							.and(QAccession.accession.lastChangedDate.ne(d)))
						.execute();
				});
			}
		);
		
	}

	private void addAllAccessionUpdates(HashMap<Long, Instant> updatedAccessions, List<Tuple> updatedLogs) {
		updatedLogs.forEach(log -> {
			var updatedInstant = log.get(1, Instant.class);
			// Use the most recent Instant
			updatedAccessions.compute(log.get(0, Long.class), (k, existingValue) -> existingValue == null ? updatedInstant : existingValue.isBefore(updatedInstant) ? updatedInstant : existingValue);
		});
	}
}