TemporaryBytesManager.java
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.filerepository.service.ftp;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.file.Path;
import org.apache.commons.io.IOUtils;
import org.genesys.filerepository.InvalidRepositoryFileDataException;
import org.genesys.filerepository.InvalidRepositoryPathException;
import org.genesys.filerepository.NoSuchRepositoryFileException;
import org.genesys.filerepository.model.RepositoryFile;
import org.genesys.filerepository.service.BytesStorageService;
import org.genesys.filerepository.service.RepositoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* This little beast manages a local copy of bytes for the FTP service. It makes
* a local copy of the bytes, allows for downloading, updating and synchronizing
* back to the {@link BytesStorageService}
*
* @author Matija Obreza
*/
@Component
@Slf4j
public class TemporaryBytesManager {
/** The repository service. */
@Autowired(required = true)
private RepositoryService repositoryService;
// private int maxFileSize = 1024 * 1024 * 50; // 50MB
/**
* Create a local file for writing. On stream close push the file to
* {@link BytesStorageService}
*
* @param user the user
* @param path the path
* @return the output stream
* @throws IOException Signals that an I/O exception has occurred.
*/
public OutputStream newFile(FtpUser user, final Path path) throws IOException {
final Path parent = path.getParent();
assert path.getFileName() != null;
final String filename = path.getFileName().toString();
log.info("Creating new path={} parent={} filename={}", path, parent, filename);
final File tempFile = File.createTempFile("ftp-", ".data");
return new RepositorySyncOutputStream(new FileOutputStream(tempFile)) {
@Override
protected void cleanup() {
log.info("Removing temporary file={}", tempFile.getAbsolutePath());
tryDelete(tempFile);
}
@Override
protected void synchronizeWithRepository() throws IOException {
FtpRunAs.asFtpUser(user, () -> {
try {
log.info("Synchronizing file={} with repository path={} originalFilename={}", tempFile.getAbsolutePath(), parent, filename);
return repositoryService.addFile(parent, filename, null, tempFile, null);
} catch (InvalidRepositoryPathException | InvalidRepositoryFileDataException e) {
log.warn("Error synchronizing new file parent={} filename={} with repository: {}", parent, filename, e.getMessage());
throw new IOException(e);
}
});
}
};
}
/**
* Read temp file to bytes.
*
* @param tempFile the temp file
* @return the byte[]
* @throws IOException Signals that an I/O exception has occurred.
*/
protected byte[] readTempFileToBytes(final File tempFile) throws IOException {
var fileLength = tempFile.length();
log.trace("Creating memory buffer of size={}", fileLength);
try (InputStream tos = new BufferedInputStream(new FileInputStream(tempFile));
// and the destination buffer
ByteArrayOutputStream baos = new ByteArrayOutputStream((int) fileLength)) {
IOUtils.copy(tos, baos);
baos.flush();
return baos.toByteArray();
}
}
/**
* Creates the input stream.
*
* @param repositoryFile the repository file
* @param offset the offset
* @return the input stream
* @throws IOException Signals that an I/O exception has occurred.
*/
public InputStream createInputStream(final RepositoryFile repositoryFile, final long offset) throws IOException {
log.info("Preparing input stream for rf={} offset={}", repositoryFile.getUuid(), offset);
final File tempFile = shadowRepositoryFile(repositoryFile);
// we have a local copy, now return the funny stream
final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(tempFile)) {
@Override
public void close() throws IOException {
super.close();
if (tempFile.exists()) {
log.debug("Removing temp file={}", tempFile.getAbsolutePath());
tryDelete(tempFile);
}
}
};
if (offset > 0) {
var skipped = 0;
while (skipped < offset) {
log.trace("Skipping {} of {} bytes to skip in the input stream, skipped {}", offset - skipped, offset, skipped);
skipped += bis.skip(offset - skipped);
}
}
return bis;
}
/**
* Shadow repository file.
*
* @param repositoryFile the repository file
* @return the file
* @throws IOException Signals that an I/O exception has occurred.
* @throws FileNotFoundException the file not found exception
*/
private File shadowRepositoryFile(final RepositoryFile repositoryFile) throws IOException {
final File tempFile = File.createTempFile("ftp-", ".data");
try (OutputStream tempFileStream = new BufferedOutputStream(new FileOutputStream(tempFile, false))) {
repositoryService.streamFileBytes(repositoryFile, tempFileStream);
}
log.debug("Shadow bytes={} from repository file.size={}", tempFile.length(), repositoryFile.getSize());
return tempFile;
}
/**
* Creates the output stream.
*
* @param user the user
* @param repositoryFile the repository file
* @param offset the offset
* @return the output stream
* @throws IOException Signals that an I/O exception has occurred.
*/
public OutputStream createOutputStream(final FtpUser user, final RepositoryFile repositoryFile, final long offset) throws IOException {
final File tempFile = shadowRepositoryFile(repositoryFile);
log.info("Preparing local copy of rf={} offset={} temp={}", repositoryFile.getUuid(), offset, tempFile.getAbsolutePath());
final RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
if (offset != 0) {
log.trace("Skipping {} of output stream", offset);
raf.seek(offset);
}
return new RepositorySyncOutputStream(Channels.newOutputStream(raf.getChannel())) {
@Override
protected void cleanup() {
log.info("Removing temporary file={}", tempFile.getAbsolutePath());
IOUtils.closeQuietly(raf);
tryDelete(tempFile);
}
@Override
protected void synchronizeWithRepository() throws IOException {
FtpRunAs.asFtpUser(user, () -> {
log.info("Stream to temporary upload buffer is closed, synchronizing bytes={}", tempFile.length());
try {
repositoryService.updateBytes(repositoryFile, null, tempFile);
} catch (final NoSuchRepositoryFileException e) {
log.warn("Error synchronizing new file with repository: {}", e.getMessage());
throw new IOException(e);
}
log.info("Synchronized file={} with repository path={} originalFilename={}", tempFile.getAbsolutePath(), repositoryFile.getFolder().getPath(), repositoryFile.getOriginalFilename());
return null;
});
}
};
}
private static void tryDelete(File file) {
log.debug("DELETING {}", file.getAbsolutePath());
try {
if (!file.delete()) {
log.debug("{} is not deleted", file.getAbsolutePath());
}
} catch (Exception e) {
log.debug("{} is not deleted", file.getAbsolutePath());
}
}
}