diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index 59c83f2e5a8..4a424ae9f18 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -30,18 +30,13 @@ import com.metamx.common.FileUtils; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.MapUtils; +import com.metamx.common.StringUtils; import com.metamx.common.UOE; import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; - -import javax.tools.FileObject; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -51,6 +46,11 @@ import java.io.Writer; import java.net.URI; import java.util.Map; import java.util.concurrent.Callable; +import javax.tools.FileObject; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.StorageObject; /** * A data segment puller that also hanldes URI data pulls. @@ -59,15 +59,17 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller { public static final int DEFAULT_RETRY_COUNT = 3; - public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws S3ServiceException + public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws ServiceException { final S3Coords coords = new S3Coords(checkURI(uri)); - final S3Object s3Obj = s3Client.getObject(coords.bucket, coords.path); + final StorageObject s3Obj = s3Client.getObjectDetails(coords.bucket, coords.path); final String path = uri.getPath(); return new FileObject() { + final Object inputStreamOpener = new Object(); volatile boolean streamAcquired = false; + volatile StorageObject storageObject = s3Obj; @Override public URI toUri() @@ -86,11 +88,19 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller public InputStream openInputStream() throws IOException { try { - streamAcquired = true; - return s3Obj.getDataInputStream(); + synchronized (inputStreamOpener) { + if (streamAcquired) { + return storageObject.getDataInputStream(); + } + // lazily promote to full GET + storageObject = s3Client.getObject(s3Obj.getBucketName(), s3Obj.getKey()); + final InputStream stream = storageObject.getDataInputStream(); + streamAcquired = true; + return stream; + } } catch (ServiceException e) { - throw new IOException(String.format("Could not load S3 URI [%s]", uri), e); + throw new IOException(StringUtils.safeFormat("Could not load S3 URI [%s]", uri), e); } } @@ -129,19 +139,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller { throw new UOE("Cannot delete S3 items anonymously. jetS3t doesn't support authenticated deletes easily."); } - - @Override - public void finalize() throws Throwable - { - try { - if (!streamAcquired) { - s3Obj.closeDataInputStream(); - } - } - finally { - super.finalize(); - } - } }; } @@ -220,7 +217,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller final String fname = Files.getNameWithoutExtension(uri.getPath()); final File outFile = new File(outDir, fname); - final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile); + final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile, S3Utils.S3RETRY); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath()); return result; } @@ -301,12 +298,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller { try { final FileObject object = buildFileObject(uri, s3Client); - // buildFileObject has a hidden input stream that gets open deep in jets3t. This helps prevent resource leaks - try (InputStream nullStream = object.openInputStream()) { - return String.format("%d", object.getLastModified()); - } + return String.format("%d", object.getLastModified()); } - catch (S3ServiceException e) { + catch (ServiceException e) { if (S3Utils.isServiceExceptionRecoverable(e)) { // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable throw new IOException( diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index eea6a04d815..2c0f22a071b 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -24,13 +24,11 @@ import com.google.common.base.Predicate; import com.metamx.common.RetryUtils; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; -import org.jets3t.service.model.S3Object; - import java.io.IOException; import java.util.concurrent.Callable; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; /** * @@ -91,7 +89,7 @@ public class S3Utils throws ServiceException { try { - s3Client.getObjectDetails(new S3Bucket(bucketName), objectKey); + s3Client.getObjectDetails(bucketName, objectKey); } catch (ServiceException e) { if (404 == e.getResponseCode() diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java index ea0585ddfdd..fca6863e8a7 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java @@ -21,31 +21,35 @@ package io.druid.storage.s3; import com.metamx.common.FileUtils; import io.druid.segment.loading.SegmentLoadingException; -import org.easymock.EasyMock; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; -import org.jets3t.service.model.S3Object; -import org.junit.Assert; -import org.junit.Test; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; -import java.nio.file.Files; import java.util.Date; import java.util.zip.GZIPOutputStream; +import org.easymock.EasyMock; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** * */ public class S3DataSegmentPullerTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test - public void testSimpleGetVersion() throws S3ServiceException, IOException + public void testSimpleGetVersion() throws ServiceException, IOException { String bucket = "bucket"; String keyPrefix = "prefix/dir/0"; @@ -57,7 +61,9 @@ public class S3DataSegmentPullerTest object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModifiedDate(new Date(0)); - EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))).andReturn(object0).once(); + EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) + .andReturn(object0) + .once(); S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); EasyMock.replay(s3Client); @@ -70,64 +76,64 @@ public class S3DataSegmentPullerTest } @Test - public void testGZUncompress() throws S3ServiceException, IOException, SegmentLoadingException + public void testGZUncompress() throws ServiceException, IOException, SegmentLoadingException { final String bucket = "bucket"; final String keyPrefix = "prefix/dir/0"; final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); final byte[] value = bucket.getBytes("utf8"); - final File tmpFile = Files.createTempFile("gzTest", ".gz").toFile(); - tmpFile.deleteOnExit(); + final File tmpFile = temporaryFolder.newFile("gzTest.gz"); + try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) { outputStream.write(value); } - S3Object object0 = new S3Object(); + final S3Object object0 = new S3Object(); object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModifiedDate(new Date(0)); object0.setDataInputStream(new FileInputStream(tmpFile)); - File tmpDir = Files.createTempDirectory("gzTestDir").toFile(); + final File tmpDir = temporaryFolder.newFolder("gzTestDir"); - try { - EasyMock.expect(s3Client.getObjectDetails(EasyMock.anyObject(), EasyMock.eq(object0.getKey()))) - .andReturn(null) - .once(); - EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))).andReturn(object0).once(); - S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); + EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(null) + .once(); + EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(object0) + .once(); + EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(object0) + .once(); + S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); - EasyMock.replay(s3Client); - FileUtils.FileCopyResult result = puller.getSegmentFiles( - new S3DataSegmentPuller.S3Coords( - bucket, - object0.getKey() - ), tmpDir - ); - EasyMock.verify(s3Client); + EasyMock.replay(s3Client); + FileUtils.FileCopyResult result = puller.getSegmentFiles( + new S3DataSegmentPuller.S3Coords( + bucket, + object0.getKey() + ), tmpDir + ); + EasyMock.verify(s3Client); - Assert.assertEquals(value.length, result.size()); - File expected = new File(tmpDir, "renames-0"); - Assert.assertTrue(expected.exists()); - Assert.assertEquals(value.length, expected.length()); - } - finally { - org.apache.commons.io.FileUtils.deleteDirectory(tmpDir); - } + Assert.assertEquals(value.length, result.size()); + File expected = new File(tmpDir, "renames-0"); + Assert.assertTrue(expected.exists()); + Assert.assertEquals(value.length, expected.length()); } @Test - public void testGZUncompressRetries() throws S3ServiceException, IOException, SegmentLoadingException + public void testGZUncompressRetries() throws ServiceException, IOException, SegmentLoadingException { final String bucket = "bucket"; final String keyPrefix = "prefix/dir/0"; final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); final byte[] value = bucket.getBytes("utf8"); - final File tmpFile = Files.createTempFile("gzTest", ".gz").toFile(); - tmpFile.deleteOnExit(); + final File tmpFile = temporaryFolder.newFile("gzTest.gz"); + try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) { outputStream.write(value); } @@ -139,39 +145,41 @@ public class S3DataSegmentPullerTest object0.setLastModifiedDate(new Date(0)); object0.setDataInputStream(new FileInputStream(tmpFile)); - File tmpDir = Files.createTempDirectory("gzTestDir").toFile(); + File tmpDir = temporaryFolder.newFolder("gzTestDir"); S3ServiceException exception = new S3ServiceException(); exception.setErrorCode("NoSuchKey"); exception.setResponseCode(404); - try { - EasyMock.expect(s3Client.getObjectDetails(EasyMock.anyObject(), EasyMock.eq(object0.getKey()))) - .andReturn(null) - .once(); - EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) - .andThrow(exception) - .once() - .andReturn(object0) - .once(); - S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); + EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(null) + .once(); + EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(object0) + .once(); + EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) + .andThrow(exception) + .once(); + EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(object0) + .once(); + EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) + .andReturn(object0) + .once(); + S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); - EasyMock.replay(s3Client); - FileUtils.FileCopyResult result = puller.getSegmentFiles( - new S3DataSegmentPuller.S3Coords( - bucket, - object0.getKey() - ), tmpDir - ); - EasyMock.verify(s3Client); + EasyMock.replay(s3Client); + FileUtils.FileCopyResult result = puller.getSegmentFiles( + new S3DataSegmentPuller.S3Coords( + bucket, + object0.getKey() + ), tmpDir + ); + EasyMock.verify(s3Client); - Assert.assertEquals(value.length, result.size()); - File expected = new File(tmpDir, "renames-0"); - Assert.assertTrue(expected.exists()); - Assert.assertEquals(value.length, expected.length()); - } - finally { - org.apache.commons.io.FileUtils.deleteDirectory(tmpDir); - } + Assert.assertEquals(value.length, result.size()); + File expected = new File(tmpDir, "renames-0"); + Assert.assertTrue(expected.exists()); + Assert.assertEquals(value.length, expected.length()); } }