AccessionRefMatcher.java

/*
 * Copyright 2019 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.genesys.server.service.worker;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionRef;
import org.genesys.server.persistence.AccessionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.google.common.collect.Lists;

/**
 * Handles matching AccessionRefs to Accessions.
 * 
 * Extracted here so that batch actions can be clearly transactional.
 * No authentication checks in place.
 * 
 * @author Matija Obreza
 */
@Component
public class AccessionRefMatcher {
	private static final Logger LOG = LoggerFactory.getLogger(AccessionRefMatcher.class);

	@Autowired
	private AccessionRepository accessionRepository;

	/**
	 * Match accession refs to accessions in Genesys.
	 *
	 * @param <T> the type of AccessonRef
	 * @param accessionRefs the list of accession references
	 * @return the list
	 */
	@Transactional(readOnly = false)
	public <X, T extends AccessionRef<X>> List<T> rematchAccessionRefs(List<T> accessionRefs) {
		LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());

		final AtomicInteger batchCounter = new AtomicInteger(0);

		var processedRefs = Lists.partition(accessionRefs, 1000).stream().map(batch -> {
			LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
			return lookupMatchingAccessions(batch);
		}).reduce(new LinkedList<T>(), (all, batch) -> { all.addAll(batch); return all; });
		LOG.info("Done relinking {} accession refs.", processedRefs.size());
		return processedRefs;
	}

	/**
	 * Re-match accession refs to accessions in Genesys.
	 *
	 * @param <T> the type of AccessonRef
	 * @param accessionRefs the list of accession references
	 * @param accessionRefRepository the repository
	 * @return the list
	 */
	@Transactional(readOnly = false)
	public <X, T extends AccessionRef<X>> List<T> rematchAccessionRefs(List<T> accessionRefs, JpaRepository<T, Long> accessionRefRepository) {
		LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());

		final AtomicInteger batchCounter = new AtomicInteger(0);

		Lists.partition(accessionRefs.stream().filter(ar -> ! ar.isNew()).collect(Collectors.toList()), 1000).stream().forEach(batch -> {
			LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
			lookupMatchingAccessions(batch);
			try {
				// Save in this transaction
				accessionRefRepository.saveAll(batch);
			} catch (Throwable e) {
				LOG.warn("Error saving accessionRefs: {}", e.getMessage());
			}
		});
		LOG.info("Done relinking {} accession refs.", accessionRefs.size());
		return accessionRefs;
	}


	/**
	 * Looking for matching Accession and then sets that to AccessionRef
	 * 
	 * @param accessionRefs the accessionRefs
	 *
	 * @return accessionRefs with matching accessions from Genesys
	 */
	private <X, T extends AccessionRef<X>> List<T> lookupMatchingAccessions(List<T> accessionRefs) {

		StopWatch stopWatch = StopWatch.createStarted();
		accessionRefs.forEach(ref -> ref.trimStringsToNull());

		final AtomicInteger batchCounter = new AtomicInteger(0);
	
		LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), accessionRefs.size());

		List<Accession> foundAccessions = accessionRepository.find(true, accessionRefs);
		LOG.info("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), accessionRefs.size(), stopWatch.getTime());

		// instCode / genus / acceNumb map
		Map<String, Map<String, Map<String, Accession>>> instMap = new HashMap<>();
		Map<String, Accession> doiAccessions = new HashMap<>();
		foundAccessions.forEach(found -> {
			if (found.getDoi() != null) {
				doiAccessions.put(found.getDoi(), found);
			}
			Map<String, Map<String, Accession>> instGenusMap = instMap.get(found.getInstCode());
			if (instGenusMap == null) {
				instMap.put(found.getInstCode(), instGenusMap = new HashMap<>());
			}
			Map<String, Accession> genusMap = instGenusMap.get(found.getGenus());
			if (genusMap == null) {
				instGenusMap.put(found.getGenus(), genusMap = new HashMap<>());
			}
			genusMap.put(found.getAccessionNumber(), found);
		});

		accessionRefs.forEach(ref -> {
			ref.setAccession(null);

			if (StringUtils.isNotBlank(ref.getDoi())) {
				Accession doiAcce = doiAccessions.get(ref.getDoi());
				if (doiAcce != null) {
					ref.setAccession(doiAcce);
					return;
				}
			}

			Map<String, Map<String, Accession>> instGenusMap = instMap.get(ref.getInstCode());
			if (instGenusMap != null) {
				Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
				if (genusMap != null) {
					ref.setAccession(genusMap.get(ref.getAcceNumb()));
				}
			}

			if (ref.getAccession() == null) {
				LOG.debug("No match for {}", ref);
			}
		});

		LOG.info("Matched {} accession refs after {}ms", accessionRefs.size(), stopWatch.getTime());
		return accessionRefs;
	}
}