GenotypeServiceImpl.java

/*
 * Copyright 2025 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.genesys.server.service.impl;

import com.google.common.collect.Lists;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.bean.CsvToBean;
import com.opencsv.bean.CsvToBeanBuilder;
import com.opencsv.enums.CSVReaderNullFieldIndicator;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.collections4.CollectionUtils;
import org.genesys.blocks.security.SecurityContextUtil;
import org.genesys.blocks.util.TransactionHelper;
import org.genesys.server.api.v2.MultiOp;
import org.genesys.server.component.firehose.FirehoseEvent;
import org.genesys.server.component.firehose.FirehoseEvent.EventType;
import org.genesys.server.exception.InvalidApiUsageException;
import org.genesys.server.exception.NotFoundElement;
import org.genesys.server.exception.SearchException;
import org.genesys.server.model.genesys.Accession;
import org.genesys.server.model.genesys.AccessionRef;
import org.genesys.server.model.impl.FaoInstitute;
import org.genesys.server.model.impl.Genotype;
import org.genesys.server.model.impl.QGenotype;
import org.genesys.server.persistence.AccessionRepository;
import org.genesys.server.persistence.GenotypeRepository;
import org.genesys.server.service.GenotypeService;
import org.genesys.server.service.InstituteService;
import org.genesys.server.service.filter.GenotypeFilter;
import org.genesys.server.service.filter.InstituteFilter;
import org.genesys.server.service.worker.AccessionRefMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.access.prepost.PostAuthorize;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

@Service
@Transactional(readOnly = true)
@Slf4j
public class GenotypeServiceImpl extends FilteredCRUDService2Impl<Genotype, GenotypeFilter, GenotypeRepository> implements GenotypeService {

	private static final String ENCODING = "UTF-8";
	private static final Character ESCAPE_CHAR = '\\';

	@Autowired
	private InstituteService instituteService;

	@Autowired
	private AccessionRefMatcher accessionRefMatcher;

	@Autowired
	private ThreadPoolTaskExecutor taskExecutor;

	@Autowired
	private AccessionRepository accessionRepository;

	@Autowired
	private ApplicationEventPublisher applicationEventPublisher;

	@Autowired
	private TransactionHelper transactionHelper;

	@Override
	public Genotype get(long id) {
		return super.get(id);
	}

	// Public access
	@Override
	public Page<Genotype> listGenotypes(GenotypeFilter genotypeFilter, Pageable page) throws SearchException {
		return super.list(genotypeFilter, page);
	}

	@Override
	public Page<Genotype> list(GenotypeFilter filter, Pageable page) throws SearchException {
		if (!SecurityContextUtil.hasRole("ADMINISTRATOR")) {
			filter = filter == null ? new GenotypeFilter() : filter;
			var myInstitutes = instituteService.listMyInstitutes(Sort.unsorted());
			if (CollectionUtils.isEmpty(myInstitutes)) {
				throw new AccessDeniedException("No permission for any institute");
			}
			filter.AND(new GenotypeFilter().list((InstituteFilter) new InstituteFilter().id(myInstitutes.stream().map(FaoInstitute::getId).collect(Collectors.toSet()))));
		}
		return super.list(filter, page);
	}

	@Override
	@Transactional
	public MultiOp<Genotype> create(List<Genotype> inserts) {
		return createFast(inserts);
	}

	@Override
	@Transactional
	public MultiOp<Genotype> createFast(List<Genotype> inserts) {
		checkPermissions(inserts);
		var instMap = inserts.stream().map(Genotype::getInstCode).distinct().collect(Collectors.toMap(code -> code, code -> instituteService.getInstitute(code)));
		inserts.forEach(g -> {
			g.setList(instMap.get(g.getInstCode()));
		});
		var saved = super.createFast(inserts);
		return saved;
	}

	@Override
	@Transactional
	@PostAuthorize("hasRole('ADMINISTRATOR')")
	public Genotype create(Genotype source) {
		return createFast(source);
	}

	@Override
	@Transactional
	@PostAuthorize("hasRole('ADMINISTRATOR')")
	public Genotype createFast(Genotype source) {
		Genotype genotype = new Genotype();
		genotype.apply(source);
		if (genotype.getList() == null) {
			throw new NotFoundElement("Institute not found: " + genotype.getInstCode());
		}
		return repository.save(genotype);
	}

	@Override
	@Transactional
	@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#target.list, 'write')")
	public Genotype update(Genotype updated, Genotype target) {
		return updateFast(updated, target);
	}

	@Override
	@Transactional
	@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#target.list, 'write')")
	public Genotype updateFast(Genotype updated, Genotype target) {
		target.apply(updated);
		return repository.save(target);
	}

	@Override
	@Transactional
	@PostAuthorize("hasRole('ADMINISTRATOR') || hasPermission(returnObject.list, 'delete')")
	public Genotype remove(Genotype entity) {
		return super.remove(entity);
	}

	@Override
	@Transactional
	public MultiOp<Genotype> remove(GenotypeFilter filter) {
		if (filter == null || filter.isEmpty()) {
			throw new InvalidApiUsageException("Must use a filter to delete genotype IDs");
		}
		log.warn("Deleting genotype IDs: {}", filter);
		var toRemove = jpaQueryFactory.selectFrom(QGenotype.genotype).leftJoin(QGenotype.genotype.accession()).fetchJoin().where(filter.buildPredicate()).fetch();
		log.warn("Deleting {} genotype IDs", toRemove.size());
		return this.remove(toRemove);
	}

	@Override
	@Transactional
	public MultiOp<Genotype> remove(List<Genotype> deletes) {
		checkPermissions(deletes);
		return super.remove(deletes);
	}

	@Override
	@Transactional
	public MultiOp<Genotype> uploadGenotypeIDs(InputStream inputStream, char separator, char quotechar) throws Exception {
		try (CSVReader reader = new CSVReaderBuilder(new BufferedReader(new InputStreamReader(inputStream, ENCODING)))
			.withCSVParser(new CSVParserBuilder()
				.withSeparator(separator)
				.withQuoteChar(quotechar)
				.withEscapeChar(ESCAPE_CHAR)
				.withStrictQuotes(false)
				.withIgnoreQuotations(true)
				.withFieldAsNull(CSVReaderNullFieldIndicator.BOTH)
				.build()).build()) {

			CsvToBean<Genotype> csvToBean = new CsvToBeanBuilder<Genotype>(reader)
				.withType(Genotype.class)
				.withIgnoreLeadingWhiteSpace(true)
				.build();

			var toSave = csvToBean.parse();
			return createFast(toSave);
		}
	}

	private void checkPermissions(List<Genotype> genotypes) {
		if (SecurityContextUtil.hasRole("ADMINISTRATOR")) return;

		var myInstitutes = instituteService.listMyInstitutes(Sort.unsorted());
		if (CollectionUtils.isEmpty(myInstitutes)) {
			throw new AccessDeniedException("No permission for any institute");
		}
		var myInstCodes = myInstitutes.stream().map(FaoInstitute::getCode).collect(Collectors.toSet());
		var institutes = genotypes.stream().map(AccessionRef::getInstCode).collect(Collectors.toSet());

		var missingPermissions = institutes.stream()
			.filter(code -> !myInstCodes.contains(code))
			.collect(Collectors.toSet());

		if (!missingPermissions.isEmpty()) {
			throw new AccessDeniedException("WRITE permission denied for institutes: " + String.join(", ", missingPermissions));
		}
	}

	/**
	 * Schedule re-matching of AccessionRefs in batches
	 * @param accessionRefs
	 */
	@Override
	public void batchRematchAccessionRefs(List<Genotype> accessionRefs) {
		Lists.partition(accessionRefs, 2000).parallelStream().forEach((batch) -> {
			taskExecutor.submit(() -> {
				try {
					log.info("Rematching {} genotypes", batch.size());
					// Thread.sleep(100);
					var rematched = accessionRefMatcher.rematchAccessionRefs(batch, repository);

					Set<Accession> accessions = rematched.stream()
						.map(Genotype::getAccession)
						.filter(Objects::nonNull)
						.collect(Collectors.toSet());

						transactionHelper.executeInTransaction(false, () -> {
							var counts = repository.countByAccessions(accessions);
							Instant timestamp = Instant.now();
							for (Accession accession : accessions) {
								Long count = counts.getOrDefault(accession.getId(), 0l);
								var nowGenotyped = count != null && count > 0;
								if (nowGenotyped != accession.isGenotyped()) {
									// log.debug("Accession {} genotyped={} was={}", accession.getAccessionNumber(), nowGenotyped, accession.isGenotyped());
									accessionRepository.setGenotyped(accession, nowGenotyped); // Avoid changing lastUpdated date
									applicationEventPublisher.publishEvent(new FirehoseEvent(Accession.class, accession.getId(), timestamp, EventType.UPDATE));
								}
							}
							return Void.TYPE;
						});
				} catch (Throwable e) {
					log.info("Rematch failed with {}", e.getMessage(), e);
				}
			});
		});
	}


	@Override
	@Transactional(readOnly = false, propagation = Propagation.REQUIRED, isolation = Isolation.READ_UNCOMMITTED)
	public int clearAccessionRefs(Collection<Accession> accessions) {
		if (accessions == null || accessions.isEmpty()) {
			return 0;
		}

		Iterable<Genotype> referencedRefs = repository.findAll(QGenotype.genotype.accession().in(accessions));
		AtomicInteger counter = new AtomicInteger();
		referencedRefs.forEach((ref) -> {
			ref.setAccession(null);
			counter.incrementAndGet();
		});
		log.warn("Removing accession reference for {} genotypes", referencedRefs);
		repository.saveAll(referencedRefs);

		return counter.get();
	}

	@Override
	public void handleRemovedGenotypes(Collection<Genotype> genotypes) {
		log.warn("Handling {} removed genotypes", genotypes.size());

		Set<Accession> accessions = genotypes.stream()
			.map(Genotype::getAccession)
			.filter(Objects::nonNull)
			.collect(Collectors.toSet());

		transactionHelper.executeInTransaction(false, () -> {
			var counts = repository.countByAccessions(accessions);
			Instant timestamp = Instant.now();
			for (Accession accession : accessions) {
				Long count = counts.getOrDefault(accession.getId(), 0l);
				var nowGenotyped = count != null && count > 0;
				if (nowGenotyped != accession.isGenotyped()) {
					// log.debug("Accession {} genotyped={} was={}", accession.getAccessionNumber(), nowGenotyped, accession.isGenotyped());
					accessionRepository.setGenotyped(accession, nowGenotyped); // Avoid changing lastUpdated date
					applicationEventPublisher.publishEvent(new FirehoseEvent(Accession.class, accession.getId(), timestamp, EventType.UPDATE)); // Reindex
				}
			}
			return Void.TYPE;
		});
	}
}