AccessionRefMatcher.java
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.server.service.worker;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionRef;
import org.genesys.server.persistence.AccessionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.google.common.collect.Lists;
/**
* Handles matching AccessionRefs to Accessions.
*
* Extracted here so that batch actions can be clearly transactional.
* No authentication checks in place.
*
* @author Matija Obreza
*/
@Component
public class AccessionRefMatcher {
private static final Logger LOG = LoggerFactory.getLogger(AccessionRefMatcher.class);
@Autowired
private AccessionRepository accessionRepository;
/**
* Match accession refs to accessions in Genesys.
*
* @param <T> the type of AccessonRef
* @param accessionRefs the list of accession references
* @return the list
*/
@Transactional(readOnly = false)
public <X, T extends AccessionRef<X>> List<T> rematchAccessionRefs(List<T> accessionRefs) {
LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());
final AtomicInteger batchCounter = new AtomicInteger(0);
var processedRefs = Lists.partition(accessionRefs, 1000).stream().map(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
return lookupMatchingAccessions(batch);
}).reduce(new LinkedList<T>(), (all, batch) -> { all.addAll(batch); return all; });
LOG.info("Done relinking {} accession refs.", processedRefs.size());
return processedRefs;
}
/**
* Re-match accession refs to accessions in Genesys.
*
* @param <T> the type of AccessonRef
* @param accessionRefs the list of accession references
* @param accessionRefRepository the repository
* @return the list
*/
@Transactional(readOnly = false)
public <X, T extends AccessionRef<X>> List<T> rematchAccessionRefs(List<T> accessionRefs, JpaRepository<T, Long> accessionRefRepository) {
LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());
final AtomicInteger batchCounter = new AtomicInteger(0);
// Existing records are automatically persisted
Lists.partition(accessionRefs.stream().filter(ar -> ! ar.isNew()).collect(Collectors.toList()), 1000).stream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
lookupMatchingAccessions(batch);
try {
// Save in this transaction
accessionRefRepository.saveAll(batch);
} catch (Throwable e) {
LOG.warn("Error saving accessionRefs: {}", e.getMessage());
}
});
// New records are not automatically persisted
Lists.partition(accessionRefs.stream().filter(ar -> ar.isNew()).collect(Collectors.toList()), 1000).stream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
lookupMatchingAccessions(batch);
});
LOG.info("Done relinking {} accession refs.", accessionRefs.size());
return accessionRefs;
}
/**
* Looking for matching Accession and then sets that to AccessionRef
*
* @param accessionRefs the accessionRefs
*
* @return accessionRefs with matching accessions from Genesys
*/
private <X, T extends AccessionRef<X>> List<T> lookupMatchingAccessions(List<T> accessionRefs) {
StopWatch stopWatch = StopWatch.createStarted();
accessionRefs.forEach(ref -> ref.trimStringsToNull());
final AtomicInteger batchCounter = new AtomicInteger(0);
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), accessionRefs.size());
List<Accession> foundAccessions = accessionRepository.find(true, accessionRefs);
LOG.info("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), accessionRefs.size(), stopWatch.getTime());
// instCode / genus / acceNumb map
Map<String, Map<String, Map<String, Accession>>> instGenusMaps = new LinkedHashMap<>();
Map<String, Map<String, List<Accession>>> instAcceNumbLists = new LinkedHashMap<>();
Map<String, Accession> doiAccessions = new HashMap<>();
foundAccessions.forEach(found -> {
if (found.getDoi() != null) {
doiAccessions.put(found.getDoi(), found);
}
// Register by instCode + genus + acceNumb
instGenusMaps.computeIfAbsent(found.getInstCode(), instCode -> new LinkedHashMap<>())
.computeIfAbsent(found.getGenus(), genus -> new LinkedHashMap<>())
.put(found.getAccessionNumber(), found);
// Register by instCode + acceNumb
instAcceNumbLists.computeIfAbsent(found.getInstCode(), instCode -> new LinkedHashMap<>())
.computeIfAbsent(found.getAccessionNumber(), acceNumb -> new LinkedList<>())
.add(found);
;
});
accessionRefs.forEach(ref -> {
ref.setAccession(null);
if (StringUtils.isNotBlank(ref.getDoi())) {
Accession doiAcce = doiAccessions.get(ref.getDoi());
if (doiAcce != null) {
ref.setAccession(doiAcce);
return;
}
}
if (ref.getGenus() != null) {
var instGenusMap = instGenusMaps.get(ref.getInstCode());
if (instGenusMap != null) {
Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
if (genusMap != null) {
ref.setAccession(genusMap.get(ref.getAcceNumb()));
}
}
}
if (ref.getAccession() == null) {
// Search without genus
var instAcceNumbs = instAcceNumbLists.get(ref.getInstCode());
if (instAcceNumbs != null) {
var acceNumbMatches = instAcceNumbs.get(ref.getAcceNumb());
if (acceNumbMatches != null && acceNumbMatches.size() == 1) {
// Only match if we have a single accession number
ref.setAccession(acceNumbMatches.get(0));
if (ref.getGenus() == null) ref.setGenus(acceNumbMatches.get(0).getGenus());
}
}
}
if (ref.getAccession() == null) {
LOG.debug("No match for {}", ref);
}
});
LOG.info("Matched {} accession refs after {}ms", accessionRefs.size(), stopWatch.getTime());
return accessionRefs;
}
}