ImportFromCSV.java

/*
 * Copyright 2017 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.genesys.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.commons.io.ByteOrderMark;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.commons.lang3.ArrayUtils;
import org.genesys.filerepository.NoSuchRepositoryFileException;
import org.genesys.server.exception.InvalidApiUsageException;

import com.google.common.base.Charsets;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;

import lombok.extern.slf4j.Slf4j;

/**
 * Utilty class for reading CSV/TSV data.
 */
@Slf4j
public abstract class ImportFromCSV<T> {

	private static final int MAX_CSV_ERRORS = 50;
	private static final char[] DEFAULT_SEPARATORS = { '\t', ',', ';' };


	public abstract Map<String, String> updateFromCsv(final InputStream str, char separator, char quoteChar, char escapeChar) throws IOException;

	protected Map<String, String> uploadData(final InputStream in, char separator, char quoteChar, char escapeChar) throws IOException {

		// Windows :-/
		BOMInputStream bomIn = BOMInputStream.builder()
			.setInputStream(in)
			.setByteOrderMarks(ByteOrderMark.UTF_8, ByteOrderMark.UTF_16LE, ByteOrderMark.UTF_16BE, ByteOrderMark.UTF_32LE, ByteOrderMark.UTF_32BE)
			.get();

		var errors = new TreeMap<String, String>();

		try (var streamReader = new BufferedReader(new InputStreamReader(bomIn, Charsets.UTF_8), 1024)) {
			streamReader.mark(1000);
			var firstLine = streamReader.readLine();
			separator = detectSeparator(firstLine, separator, quoteChar, escapeChar);
			log.warn("Using separator '{}' for line: {}", separator, firstLine);
			streamReader.reset();

			var parser = new CSVParserBuilder()
				.withSeparator(separator)
				.withQuoteChar(quoteChar)
				.withEscapeChar(escapeChar)
				.withStrictQuotes(false)
				.withIgnoreLeadingWhiteSpace(false)
				.withIgnoreQuotations(false)
				.build();


			var batch = new LinkedList<T>();
			try (CSVReader reader = new CSVReaderBuilder(streamReader).withCSVParser(parser).build()) {

				// header line
				String[] recordLine = reader.readNext();
				verifyHeaders(recordLine);

				var counter = 0;
				while ((recordLine = reader.readNext()) != null) {
					counter++;
					try {
						batch.add(readRecord(recordLine));
						if (batch.size() >= 100) {
							errors.putAll(updateRecords(batch));
							batch.clear();
						}
					} catch (Throwable e) {
						errors.put(recordLine[0], e.getMessage());
						log.info("Error {}/{} updating metadata: {}", errors.size(), MAX_CSV_ERRORS, e.getMessage());
						if (errors.size() >= MAX_CSV_ERRORS) return errors;
					}
					if (counter % 1000 == 0) log.warn("Updated metadata of {} repository files", counter);
				}
				try {
					if (batch.size() > 0) {
						errors.putAll(updateRecords(batch));
						batch.clear();
					}
				} catch (Throwable e) {
					errors.put("Final", e.getMessage());
					log.info("Error {}/{} updating metadata: {}", errors.size(), MAX_CSV_ERRORS, e.getMessage());
					if (errors.size() >= MAX_CSV_ERRORS) return errors;
				}
				log.warn("Updated metadata of {} repository files", counter);

			} catch (IOException e) {
				log.error("Error reading input: {}", e.getMessage(), e);
				throw e;
			} catch (CsvValidationException e) {
				log.error("CSV validation exception.", e);
				throw new IOException(e);
			}
		}

		return errors;
	}

	protected char detectSeparator(String headersLine, char defaultSeparator, char quoteChar, char escapeChar) {
		char[] separators = ArrayUtils.addFirst(DEFAULT_SEPARATORS, defaultSeparator);

		for (char separator : separators) {
			try {
				var parser = new CSVParserBuilder()
					.withSeparator(separator)
					.withQuoteChar(quoteChar)
					.withEscapeChar(escapeChar)
					.withStrictQuotes(false)
					.withIgnoreLeadingWhiteSpace(false)
					.withIgnoreQuotations(false)
					.build();

					try (CSVReader reader = new CSVReaderBuilder(new StringReader(headersLine)).withCSVParser(parser).build()) {
						// header line
						String[] recordLine = reader.readNext();
						verifyHeaders(recordLine);
						return separator;
					}

			} catch (Throwable e) {
				// log and ignore
				log.debug("Separator '{}' does not work: {} for line: {}", separator, e.getMessage(), headersLine);
			}
		}
		return defaultSeparator;
	}

	/**
	 * Tests if header line contains the expected headers
	 * 
	 * @param headers
	 * @throws IOException when headers don't match
	 */
	protected abstract void verifyHeaders(String[] headers) throws IOException;

	/**
	 * Update your object!
	 *
	 * @param recordLine
	 * @return
	 * @throws IOException
	 * @throws InvalidApiUsageException
	 * @throws NoSuchRepositoryFileException
	 */
	protected abstract T readRecord(String[] recordLine) throws IOException;

	/**
	 * @param batch the batch of updates
	 * @return map of errors by record key
	 * @throws Exception
	 */
	protected abstract Map<String, String> updateRecords(List<T> batch) throws Exception;
}