diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java index 368a50ff9cf..77cd429cbc8 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java @@ -71,7 +71,8 @@ public class S3DataSegmentPuller implements URIDataPuller this.s3Client = s3Client; } - FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, final File outDir) throws SegmentLoadingException + FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, final File outDir) + throws SegmentLoadingException { log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); @@ -149,16 +150,15 @@ public class S3DataSegmentPuller implements URIDataPuller } } - private FileObject buildFileObject(final URI uri) throws AmazonServiceException + public FileObject buildFileObject(final URI uri) throws AmazonServiceException { final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri)); - final S3ObjectSummary objectSummary = - S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath()); final String path = uri.getPath(); return new FileObject() { S3Object s3Object = null; + S3ObjectSummary objectSummary = null; @Override public URI toUri() @@ -182,7 +182,7 @@ public class S3DataSegmentPuller implements URIDataPuller try { if (s3Object == null) { // lazily promote to full GET - s3Object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); + s3Object = s3Client.getObject(coords.getBucket(), coords.getPath()); } final InputStream in = s3Object.getObjectContent(); @@ -231,6 +231,13 @@ public class S3DataSegmentPuller implements URIDataPuller @Override public long getLastModified() { + if (s3Object != null) { + return s3Object.getObjectMetadata().getLastModified().getTime(); + } + if (objectSummary == null) { + objectSummary = + S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath()); + } return objectSummary.getLastModified().getTime(); } @@ -252,9 +259,7 @@ public class S3DataSegmentPuller implements URIDataPuller * Returns the "version" (aka last modified timestamp) of the URI * * @param uri The URI to check the last timestamp - * * @return The time in ms of the last modification of the URI in String format - * * @throws IOException */ @Override diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java index fdeb2212ba6..7b805249f99 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java @@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.commons.io.IOUtils; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; @@ -38,6 +39,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -107,18 +109,11 @@ public class S3DataSegmentPullerTest objectSummary.setKey(keyPrefix + "/renames-0.gz"); objectSummary.setLastModified(new Date(0)); - final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result(); - listObjectsResult.setKeyCount(1); - listObjectsResult.getObjectSummaries().add(objectSummary); - final File tmpDir = temporaryFolder.newFolder("gzTestDir"); EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) .andReturn(true) .once(); - EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) - .andReturn(listObjectsResult) - .once(); EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) .andReturn(object0) .once(); @@ -177,15 +172,9 @@ public class S3DataSegmentPullerTest EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) .andReturn(true) .once(); - EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) - .andReturn(listObjectsResult) - .once(); EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) .andThrow(exception) .once(); - EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) - .andReturn(listObjectsResult) - .once(); EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) .andReturn(object0) .once(); @@ -206,4 +195,75 @@ public class S3DataSegmentPullerTest Assert.assertEquals(value.length, expected.length()); } + @Test + public void testS3ObjectStream() throws IOException + { + final String bucket = "bucket"; + final String keyPrefix = "prefix/dir/0"; + final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); + final byte[] value = bucket.getBytes(StandardCharsets.UTF_8); + + final File tmpFile = temporaryFolder.newFile("testObjectFile"); + + try (OutputStream outputStream = new FileOutputStream(tmpFile)) { + outputStream.write(value); + } + + final S3Object object0 = new S3Object(); + object0.setBucketName(bucket); + object0.setKey(keyPrefix + "/test-object"); + object0.getObjectMetadata().setLastModified(new Date(0)); + object0.setObjectContent(new FileInputStream(tmpFile)); + + EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(object0) + .once(); + S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); + EasyMock.replay(s3Client); + InputStream stream = puller.buildFileObject(URI.create(StringUtils.format("s3://%s/%s", bucket, object0.getKey()))) + .openInputStream(); + EasyMock.verify(s3Client); + Assert.assertEquals(bucket, IOUtils.toString(stream, StandardCharsets.UTF_8)); + } + + @Test + public void testS3ObjectModifiedDate() throws IOException + { + final String bucket = "bucket"; + final String keyPrefix = "prefix/dir/0"; + final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); + final byte[] value = bucket.getBytes(StandardCharsets.UTF_8); + + final File tmpFile = temporaryFolder.newFile("testObjectFile"); + + try (OutputStream outputStream = new FileOutputStream(tmpFile)) { + outputStream.write(value); + } + + final S3Object object0 = new S3Object(); + object0.setBucketName(bucket); + object0.setKey(keyPrefix + "/test-object"); + object0.getObjectMetadata().setLastModified(new Date(0)); + object0.setObjectContent(new FileInputStream(tmpFile)); + + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucket); + objectSummary.setKey(keyPrefix + "/test-object"); + objectSummary.setLastModified(new Date(0)); + + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setKeyCount(1); + result.getObjectSummaries().add(objectSummary); + + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(result) + .once(); + + S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); + EasyMock.replay(s3Client); + long modifiedDate = puller.buildFileObject(URI.create(StringUtils.format("s3://%s/%s", bucket, object0.getKey()))) + .getLastModified(); + EasyMock.verify(s3Client); + Assert.assertEquals(0, modifiedDate); + } }