TemporaryBytesManager.java

/*
 * Copyright 2018 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.genesys.filerepository.service.ftp;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.file.Path;

import org.apache.commons.io.IOUtils;
import org.genesys.filerepository.InvalidRepositoryFileDataException;
import org.genesys.filerepository.InvalidRepositoryPathException;
import org.genesys.filerepository.NoSuchRepositoryFileException;
import org.genesys.filerepository.model.RepositoryFile;
import org.genesys.filerepository.service.BytesStorageService;
import org.genesys.filerepository.service.RepositoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * This little beast manages a local copy of bytes for the FTP service. It makes
 * a local copy of the bytes, allows for downloading, updating and synchronizing
 * back to the {@link BytesStorageService}
 *
 * @author Matija Obreza
 */
@Component
@Slf4j
public class TemporaryBytesManager {

	/** The repository service. */
	@Autowired(required = true)
	private RepositoryService repositoryService;

	// private int maxFileSize = 1024 * 1024 * 50; // 50MB

	/**
	 * Create a local file for writing. On stream close push the file to
	 * {@link BytesStorageService}
	 *
	 * @param user the user
	 * @param path the path
	 * @return the output stream
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	public OutputStream newFile(FtpUser user, final Path path) throws IOException {
		final Path parent = path.getParent();
		assert path.getFileName() != null;
		final String filename = path.getFileName().toString();

		log.info("Creating new path={} parent={} filename={}", path, parent, filename);

		final File tempFile = File.createTempFile("ftp-", ".data");

		return new RepositorySyncOutputStream(new FileOutputStream(tempFile)) {
			@Override
			protected void cleanup() {
				log.info("Removing temporary file={}", tempFile.getAbsolutePath());
				tryDelete(tempFile);
			}

			@Override
			protected void synchronizeWithRepository() throws IOException {
				FtpRunAs.asFtpUser(user, () -> {
					try {
						log.info("Synchronizing file={} with repository path={} originalFilename={}", tempFile.getAbsolutePath(), parent, filename);
						return repositoryService.addFile(parent, filename, null, tempFile, null);
					} catch (InvalidRepositoryPathException | InvalidRepositoryFileDataException e) {
						log.warn("Error synchronizing new file parent={} filename={} with repository: {}", parent, filename, e.getMessage());
						throw new IOException(e);
					}
				});
			}
		};
	}

	/**
	 * Read temp file to bytes.
	 *
	 * @param tempFile the temp file
	 * @return the byte[]
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	protected byte[] readTempFileToBytes(final File tempFile) throws IOException {
		var fileLength = tempFile.length();
		log.trace("Creating memory buffer of size={}", fileLength);
		try (InputStream tos = new BufferedInputStream(new FileInputStream(tempFile));
				// and the destination buffer
				ByteArrayOutputStream baos = new ByteArrayOutputStream((int) fileLength)) {

			IOUtils.copy(tos, baos);
			baos.flush();
			return baos.toByteArray();
		}
	}

	/**
	 * Creates the input stream.
	 *
	 * @param repositoryFile the repository file
	 * @param offset the offset
	 * @return the input stream
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	public InputStream createInputStream(final RepositoryFile repositoryFile, final long offset) throws IOException {
		log.info("Preparing input stream for rf={} offset={}", repositoryFile.getUuid(), offset);
		final File tempFile = shadowRepositoryFile(repositoryFile);

		// we have a local copy, now return the funny stream
		final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(tempFile)) {
			@Override
			public void close() throws IOException {
				super.close();
				if (tempFile.exists()) {
					log.debug("Removing temp file={}", tempFile.getAbsolutePath());
					tryDelete(tempFile);
				}
			}
		};

		if (offset > 0) {
			var skipped = 0;
			while (skipped < offset) {
				log.trace("Skipping {} of {} bytes to skip in the input stream, skipped {}", offset - skipped, offset, skipped);
				skipped += bis.skip(offset - skipped);
			}
		}
		return bis;
	}

	/**
	 * Shadow repository file.
	 *
	 * @param repositoryFile the repository file
	 * @return the file
	 * @throws IOException Signals that an I/O exception has occurred.
	 * @throws FileNotFoundException the file not found exception
	 */
	private File shadowRepositoryFile(final RepositoryFile repositoryFile) throws IOException {
		final File tempFile = File.createTempFile("ftp-", ".data");
		try (OutputStream tempFileStream = new BufferedOutputStream(new FileOutputStream(tempFile, false))) {
			repositoryService.streamFileBytes(repositoryFile, tempFileStream);
		}
		log.debug("Shadow bytes={} from repository file.size={}", tempFile.length(), repositoryFile.getSize());
		return tempFile;
	}

	/**
	 * Creates the output stream.
	 *
	 * @param user the user
	 * @param repositoryFile the repository file
	 * @param offset the offset
	 * @return the output stream
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	public OutputStream createOutputStream(final FtpUser user, final RepositoryFile repositoryFile, final long offset) throws IOException {
		final File tempFile = shadowRepositoryFile(repositoryFile);
		log.info("Preparing local copy of rf={} offset={} temp={}", repositoryFile.getUuid(), offset, tempFile.getAbsolutePath());

		final RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
		if (offset != 0) {
			log.trace("Skipping {} of output stream", offset);
			raf.seek(offset);
		}

		return new RepositorySyncOutputStream(Channels.newOutputStream(raf.getChannel())) {
			@Override
			protected void cleanup() {
				log.info("Removing temporary file={}", tempFile.getAbsolutePath());
				IOUtils.closeQuietly(raf);
				tryDelete(tempFile);
			}

			@Override
			protected void synchronizeWithRepository() throws IOException {
				FtpRunAs.asFtpUser(user, () -> {
					log.info("Stream to temporary upload buffer is closed, synchronizing bytes={}", tempFile.length());
					try {
						repositoryService.updateBytes(repositoryFile, null, tempFile);
					} catch (final NoSuchRepositoryFileException e) {
						log.warn("Error synchronizing new file with repository: {}", e.getMessage());
						throw new IOException(e);
					}
					log.info("Synchronized file={} with repository path={} originalFilename={}", tempFile.getAbsolutePath(), repositoryFile.getFolder().getPath(), repositoryFile.getOriginalFilename());
					return null;
				});
			}
		};
	}

	private static void tryDelete(File file) {
		log.debug("DELETING {}", file.getAbsolutePath());
		try {
			if (!file.delete()) {
				log.debug("{} is not deleted", file.getAbsolutePath());
			}
		} catch (Exception e) {
			log.debug("{} is not deleted", file.getAbsolutePath());
		}
	}
}