diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java index c621aa6efce9..c883110a3b36 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java @@ -10,6 +10,7 @@ import static java.lang.String.valueOf; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -106,6 +107,11 @@ public class S3BitStoreService extends BaseBitStoreService { private String endpoint; private boolean pathStyleAccessEnabled; + /** + * The maximum size of individual chunk to download from S3 when a file is accessed. Default 5Mb + */ + private long bufferSize = 5 * 1024 * 1024; + /** * container for all the assets */ @@ -191,7 +197,7 @@ public boolean isEnabled() { @Override public void init() throws IOException { - if (this.isInitialized()) { + if (this.isInitialized() || !this.isEnabled()) { return; } @@ -289,20 +295,7 @@ public InputStream get(Bitstream bitstream) throws IOException { if (isRegisteredBitstream(key)) { key = key.substring(REGISTERED_FLAG.length()); } - try { - File tempFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp"); - tempFile.deleteOnExit(); - - GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, key); - - Download download = tm.download(getObjectRequest, tempFile); - download.waitForCompletion(); - - return new DeleteOnCloseFileInputStream(tempFile); - } catch (AmazonClientException | InterruptedException e) { - log.error("get(" + key + ")", e); - throw new IOException(e); - } + return new S3LazyInputStream(key, bufferSize, bitstream.getSizeBytes()); } /** @@ -669,4 +662,84 @@ public boolean isRegisteredBitstream(String internalId) { return internalId.startsWith(REGISTERED_FLAG); } + public void setBufferSize(long bufferSize) { + this.bufferSize = bufferSize; + } + + /** + * This inner class represent an InputStream that uses temporary files to + * represent chunk of the object downloaded from S3. When the input stream is + * read the class look first to the current chunk and download a new one once if + * the current one as been fully read. The class is responsible to close a chunk + * as soon as a new one is retrieved, the last chunk is closed when the input + * stream itself is closed or the last byte is read (the first of the two) + */ + public class S3LazyInputStream extends InputStream { + private InputStream currentChunkStream; + private String objectKey; + private long endOfChunk = -1; + private long chunkMaxSize; + private long currPos = 0; + private long fileSize; + + public S3LazyInputStream(String objectKey, long chunkMaxSize, long fileSize) throws IOException { + this.objectKey = objectKey; + this.chunkMaxSize = chunkMaxSize; + this.endOfChunk = 0; + this.fileSize = fileSize; + downloadChunk(); + } + + @Override + public int read() throws IOException { + // is the current chunk completely read and other are available? + if (currPos == endOfChunk && currPos < fileSize) { + currentChunkStream.close(); + downloadChunk(); + } + + int byteRead = currPos < endOfChunk ? currentChunkStream.read() : -1; + // do we get any data or are we at the end of the file? + if (byteRead != -1) { + currPos++; + } else { + currentChunkStream.close(); + } + return byteRead; + } + + /** + * This method download the next chunk from S3 + * + * @throws IOException + * @throws FileNotFoundException + */ + private void downloadChunk() throws IOException, FileNotFoundException { + // Create a DownloadFileRequest with the desired byte range + long startByte = currPos; // Start byte (inclusive) + long endByte = Long.min(startByte + chunkMaxSize - 1, fileSize - 1); // End byte (inclusive) + GetObjectRequest getRequest = new GetObjectRequest(bucketName, objectKey) + .withRange(startByte, endByte); + + File currentChunkFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp"); + currentChunkFile.deleteOnExit(); + try { + Download download = tm.download(getRequest, currentChunkFile); + download.waitForCompletion(); + currentChunkStream = new DeleteOnCloseFileInputStream(currentChunkFile); + endOfChunk = endOfChunk + download.getProgress().getBytesTransferred(); + } catch (AmazonClientException | InterruptedException e) { + currentChunkFile.delete(); + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + if (currentChunkStream != null) { + currentChunkStream.close(); + } + } + + } } diff --git a/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java b/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java index 7aae1cf2719c..33cbabc0aafc 100644 --- a/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java +++ b/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -42,6 +43,7 @@ import io.findify.s3mock.S3Mock; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.BooleanUtils; import org.dspace.AbstractIntegrationTestWithDatabase; import org.dspace.app.matcher.LambdaMatcher; import org.dspace.authorize.AuthorizeException; @@ -53,6 +55,8 @@ import org.dspace.content.Collection; import org.dspace.content.Item; import org.dspace.core.Utils; +import org.dspace.services.ConfigurationService; +import org.dspace.services.factory.DSpaceServicesFactory; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.After; @@ -77,9 +81,12 @@ public class S3BitStoreServiceIT extends AbstractIntegrationTestWithDatabase { private File s3Directory; + private ConfigurationService configurationService = DSpaceServicesFactory.getInstance().getConfigurationService(); + @Before public void setup() throws Exception { + configurationService.setProperty("assetstore.s3.enabled", "true"); s3Directory = new File(System.getProperty("java.io.tmpdir"), "s3"); s3Mock = S3Mock.create(8001, s3Directory.getAbsolutePath()); @@ -88,7 +95,9 @@ public void setup() throws Exception { amazonS3Client = createAmazonS3Client(); s3BitStoreService = new S3BitStoreService(amazonS3Client); - + s3BitStoreService.setEnabled(BooleanUtils.toBoolean( + configurationService.getProperty("assetstore.s3.enabled"))); + s3BitStoreService.setBufferSize(22); context.turnOffAuthorisationSystem(); parentCommunity = CommunityBuilder.createCommunity(context) @@ -119,12 +128,25 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException assertThat(amazonS3Client.listBuckets(), contains(bucketNamed(bucketName))); context.turnOffAuthorisationSystem(); - String content = "Test bitstream content"; + String content = "Test bitstream content"; + String contentOverOneSpan = "This content span two chunks"; + String contentExactlyTwoSpans = "Test bitstream contentTest bitstream content"; + String contentOverOneTwoSpans = "Test bitstream contentThis content span three chunks"; Bitstream bitstream = createBitstream(content); + Bitstream bitstreamOverOneSpan = createBitstream(contentOverOneSpan); + Bitstream bitstreamExactlyTwoSpans = createBitstream(contentExactlyTwoSpans); + Bitstream bitstreamOverOneTwoSpans = createBitstream(contentOverOneTwoSpans); context.restoreAuthSystemState(); - s3BitStoreService.put(bitstream, toInputStream(content)); + checkGetPut(bucketName, content, bitstream); + checkGetPut(bucketName, contentOverOneSpan, bitstreamOverOneSpan); + checkGetPut(bucketName, contentExactlyTwoSpans, bitstreamExactlyTwoSpans); + checkGetPut(bucketName, contentOverOneTwoSpans, bitstreamOverOneTwoSpans); + } + + private void checkGetPut(String bucketName, String content, Bitstream bitstream) throws IOException { + s3BitStoreService.put(bitstream, toInputStream(content)); String expectedChecksum = Utils.toHex(generateChecksum(content)); assertThat(bitstream.getSizeBytes(), is((long) content.length())); @@ -137,7 +159,6 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException String key = s3BitStoreService.getFullKey(bitstream.getInternalId()); ObjectMetadata objectMetadata = amazonS3Client.getObjectMetadata(bucketName, key); assertThat(objectMetadata.getContentMD5(), is(expectedChecksum)); - } @Test @@ -382,6 +403,17 @@ public void givenBitStreamIdentifierWithSlashesWhenSanitizedThenSlashesMustBeRem assertThat(computedPath, Matchers.not(Matchers.containsString(File.separator))); } + @Test + public void testDoNotInitializeConfigured() throws Exception { + String assetstores3enabledOldValue = configurationService.getProperty("assetstore.s3.enabled"); + configurationService.setProperty("assetstore.s3.enabled", false); + s3BitStoreService = new S3BitStoreService(amazonS3Client); + s3BitStoreService.init(); + assertFalse(s3BitStoreService.isInitialized()); + assertFalse(s3BitStoreService.isEnabled()); + configurationService.setProperty("assetstore.s3.enabled", assetstores3enabledOldValue); + } + private byte[] generateChecksum(String content) { try { MessageDigest m = MessageDigest.getInstance("MD5");