AccessionRefMatcher.java

/*
 * Copyright 2019 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.genesys.server.service.worker;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionRef;
import org.genesys.server.persistence.AccessionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.google.common.collect.Lists;

/**
 * Handles matching AccessionRefs to Accessions.
 * 
 * Extracted here so that batch actions can be clearly transactional.
 * No authentication checks in place.
 * 
 * @author Matija Obreza
 */
@Component
public class AccessionRefMatcher {
	private static final Logger LOG = LoggerFactory.getLogger(AccessionRefMatcher.class);

	@Autowired
	private AccessionRepository accessionRepository;

	/**
	 * Match accession refs to accessions in Genesys.
	 *
	 * @param <T> the type of AccessonRef
	 * @param accessionRefs the list of accession references
	 * @return the list
	 */
	@Transactional(readOnly = false)
	public <X, T extends AccessionRef<X>> List<T> rematchAccessionRefs(List<T> accessionRefs) {
		LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());

		final AtomicInteger batchCounter = new AtomicInteger(0);

		var processedRefs = Lists.partition(accessionRefs, 1000).stream().map(batch -> {
			LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
			return lookupMatchingAccessions(batch);
		}).reduce(new LinkedList<T>(), (all, batch) -> { all.addAll(batch); return all; });
		LOG.info("Done relinking {} accession refs.", processedRefs.size());
		return processedRefs;
	}

	/**
	 * Re-match accession refs to accessions in Genesys.
	 *
	 * @param <T> the type of AccessonRef
	 * @param accessionRefs the list of accession references
	 * @param accessionRefRepository the repository
	 * @return the list
	 */
	@Transactional(readOnly = false)
	public <X, T extends AccessionRef<X>> List<T> rematchAccessionRefs(List<T> accessionRefs, JpaRepository<T, Long> accessionRefRepository) {
		LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());

		final AtomicInteger batchCounter = new AtomicInteger(0);

		// Existing records are automatically persisted
		Lists.partition(accessionRefs.stream().filter(ar -> ! ar.isNew()).collect(Collectors.toList()), 1000).stream().forEach(batch -> {
			LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
			lookupMatchingAccessions(batch);
			try {
				// Save in this transaction
				accessionRefRepository.saveAll(batch);
			} catch (Throwable e) {
				LOG.warn("Error saving accessionRefs: {}", e.getMessage());
			}
		});

		// New records are not automatically persisted
		Lists.partition(accessionRefs.stream().filter(ar -> ar.isNew()).collect(Collectors.toList()), 1000).stream().forEach(batch -> {
			LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
			lookupMatchingAccessions(batch);
		});
		LOG.info("Done relinking {} accession refs.", accessionRefs.size());
		return accessionRefs;
	}


	/**
	 * Looking for matching Accession and then sets that to AccessionRef
	 * 
	 * @param accessionRefs the accessionRefs
	 *
	 * @return accessionRefs with matching accessions from Genesys
	 */
	private <X, T extends AccessionRef<X>> List<T> lookupMatchingAccessions(List<T> accessionRefs) {

		StopWatch stopWatch = StopWatch.createStarted();
		accessionRefs.forEach(ref -> ref.trimStringsToNull());

		final AtomicInteger batchCounter = new AtomicInteger(0);
	
		LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), accessionRefs.size());

		List<Accession> foundAccessions = accessionRepository.find(true, accessionRefs);
		LOG.info("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), accessionRefs.size(), stopWatch.getTime());

		// instCode / genus / acceNumb map
		Map<String, Map<String, Map<String, Accession>>> instGenusMaps = new LinkedHashMap<>();
		Map<String, Map<String, List<Accession>>> instAcceNumbLists = new LinkedHashMap<>();
		Map<String, Accession> doiAccessions = new HashMap<>();
		foundAccessions.forEach(found -> {
			if (found.getDoi() != null) {
				doiAccessions.put(found.getDoi(), found);
			}
			// Register by instCode + genus + acceNumb
			instGenusMaps.computeIfAbsent(found.getInstCode(), instCode -> new LinkedHashMap<>())
				.computeIfAbsent(found.getGenus(), genus -> new LinkedHashMap<>())
				.put(found.getAccessionNumber(), found);

			// Register by instCode + acceNumb
			instAcceNumbLists.computeIfAbsent(found.getInstCode(), instCode -> new LinkedHashMap<>())
				.computeIfAbsent(found.getAccessionNumber(), acceNumb -> new LinkedList<>())
				.add(found);
			;
		});

		accessionRefs.forEach(ref -> {
			ref.setAccession(null);

			if (StringUtils.isNotBlank(ref.getDoi())) {
				Accession doiAcce = doiAccessions.get(ref.getDoi());
				if (doiAcce != null) {
					ref.setAccession(doiAcce);
					return;
				}
			}

			if (ref.getGenus() != null) {
				var instGenusMap = instGenusMaps.get(ref.getInstCode());
				if (instGenusMap != null) {
					Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
					if (genusMap != null) {
						ref.setAccession(genusMap.get(ref.getAcceNumb()));
					}
				}
			}
			if (ref.getAccession() == null) {
				// Search without genus
				var instAcceNumbs = instAcceNumbLists.get(ref.getInstCode());
				if (instAcceNumbs != null) {
					var acceNumbMatches = instAcceNumbs.get(ref.getAcceNumb());
					if (acceNumbMatches != null && acceNumbMatches.size() == 1) {
						// Only match if we have a single accession number
						ref.setAccession(acceNumbMatches.get(0));
						if (ref.getGenus() == null) ref.setGenus(acceNumbMatches.get(0).getGenus());
					}
				}
			}

			if (ref.getAccession() == null) {
				LOG.debug("No match for {}", ref);
			}
		});

		LOG.info("Matched {} accession refs after {}ms", accessionRefs.size(), stopWatch.getTime());
		return accessionRefs;
	}
}