Make S3DataSegmentPuller do GET requests less often (#2900)

* Make S3DataSegmentPuller do GET requests less often
* Fixes #2894

* Run intellij formatting on S3Utils

* Remove forced stream fetching on getVersion

* Remove unneeded finalize

* Allow initial object fetching to fail and be retried
This commit is contained in:
Charles Allen 2016-05-04 16:21:35 -07:00 committed by Fangjin Yang
parent 035134d070
commit 2a769a9fb7
3 changed files with 104 additions and 104 deletions

View File

@ -30,18 +30,13 @@ import com.metamx.common.FileUtils;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.MapUtils; import com.metamx.common.MapUtils;
import com.metamx.common.StringUtils;
import com.metamx.common.UOE; import com.metamx.common.UOE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller; import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import javax.tools.FileObject;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -51,6 +46,11 @@ import java.io.Writer;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import javax.tools.FileObject;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;
/** /**
* A data segment puller that also hanldes URI data pulls. * A data segment puller that also hanldes URI data pulls.
@ -59,15 +59,17 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
{ {
public static final int DEFAULT_RETRY_COUNT = 3; public static final int DEFAULT_RETRY_COUNT = 3;
public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws S3ServiceException public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws ServiceException
{ {
final S3Coords coords = new S3Coords(checkURI(uri)); final S3Coords coords = new S3Coords(checkURI(uri));
final S3Object s3Obj = s3Client.getObject(coords.bucket, coords.path); final StorageObject s3Obj = s3Client.getObjectDetails(coords.bucket, coords.path);
final String path = uri.getPath(); final String path = uri.getPath();
return new FileObject() return new FileObject()
{ {
final Object inputStreamOpener = new Object();
volatile boolean streamAcquired = false; volatile boolean streamAcquired = false;
volatile StorageObject storageObject = s3Obj;
@Override @Override
public URI toUri() public URI toUri()
@ -86,11 +88,19 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
public InputStream openInputStream() throws IOException public InputStream openInputStream() throws IOException
{ {
try { try {
streamAcquired = true; synchronized (inputStreamOpener) {
return s3Obj.getDataInputStream(); if (streamAcquired) {
return storageObject.getDataInputStream();
}
// lazily promote to full GET
storageObject = s3Client.getObject(s3Obj.getBucketName(), s3Obj.getKey());
final InputStream stream = storageObject.getDataInputStream();
streamAcquired = true;
return stream;
}
} }
catch (ServiceException e) { catch (ServiceException e) {
throw new IOException(String.format("Could not load S3 URI [%s]", uri), e); throw new IOException(StringUtils.safeFormat("Could not load S3 URI [%s]", uri), e);
} }
} }
@ -129,19 +139,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
{ {
throw new UOE("Cannot delete S3 items anonymously. jetS3t doesn't support authenticated deletes easily."); throw new UOE("Cannot delete S3 items anonymously. jetS3t doesn't support authenticated deletes easily.");
} }
@Override
public void finalize() throws Throwable
{
try {
if (!streamAcquired) {
s3Obj.closeDataInputStream();
}
}
finally {
super.finalize();
}
}
}; };
} }
@ -220,7 +217,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
final String fname = Files.getNameWithoutExtension(uri.getPath()); final String fname = Files.getNameWithoutExtension(uri.getPath());
final File outFile = new File(outDir, fname); final File outFile = new File(outDir, fname);
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile); final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile, S3Utils.S3RETRY);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath()); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath());
return result; return result;
} }
@ -301,12 +298,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
{ {
try { try {
final FileObject object = buildFileObject(uri, s3Client); final FileObject object = buildFileObject(uri, s3Client);
// buildFileObject has a hidden input stream that gets open deep in jets3t. This helps prevent resource leaks return String.format("%d", object.getLastModified());
try (InputStream nullStream = object.openInputStream()) {
return String.format("%d", object.getLastModified());
}
} }
catch (S3ServiceException e) { catch (ServiceException e) {
if (S3Utils.isServiceExceptionRecoverable(e)) { if (S3Utils.isServiceExceptionRecoverable(e)) {
// The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable
throw new IOException( throw new IOException(

View File

@ -24,13 +24,11 @@ import com.google.common.base.Predicate;
import com.metamx.common.RetryUtils; import com.metamx.common.RetryUtils;
import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
/** /**
* *
@ -91,7 +89,7 @@ public class S3Utils
throws ServiceException throws ServiceException
{ {
try { try {
s3Client.getObjectDetails(new S3Bucket(bucketName), objectKey); s3Client.getObjectDetails(bucketName, objectKey);
} }
catch (ServiceException e) { catch (ServiceException e) {
if (404 == e.getResponseCode() if (404 == e.getResponseCode()

View File

@ -21,31 +21,35 @@ package io.druid.storage.s3;
import com.metamx.common.FileUtils; import com.metamx.common.FileUtils;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import org.easymock.EasyMock;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.junit.Assert;
import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.nio.file.Files;
import java.util.Date; import java.util.Date;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import org.easymock.EasyMock;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/** /**
* *
*/ */
public class S3DataSegmentPullerTest public class S3DataSegmentPullerTest
{ {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test @Test
public void testSimpleGetVersion() throws S3ServiceException, IOException public void testSimpleGetVersion() throws ServiceException, IOException
{ {
String bucket = "bucket"; String bucket = "bucket";
String keyPrefix = "prefix/dir/0"; String keyPrefix = "prefix/dir/0";
@ -57,7 +61,9 @@ public class S3DataSegmentPullerTest
object0.setKey(keyPrefix + "/renames-0.gz"); object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0)); object0.setLastModifiedDate(new Date(0));
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))).andReturn(object0).once(); EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andReturn(object0)
.once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
EasyMock.replay(s3Client); EasyMock.replay(s3Client);
@ -70,64 +76,64 @@ public class S3DataSegmentPullerTest
} }
@Test @Test
public void testGZUncompress() throws S3ServiceException, IOException, SegmentLoadingException public void testGZUncompress() throws ServiceException, IOException, SegmentLoadingException
{ {
final String bucket = "bucket"; final String bucket = "bucket";
final String keyPrefix = "prefix/dir/0"; final String keyPrefix = "prefix/dir/0";
final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
final byte[] value = bucket.getBytes("utf8"); final byte[] value = bucket.getBytes("utf8");
final File tmpFile = Files.createTempFile("gzTest", ".gz").toFile(); final File tmpFile = temporaryFolder.newFile("gzTest.gz");
tmpFile.deleteOnExit();
try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) { try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) {
outputStream.write(value); outputStream.write(value);
} }
S3Object object0 = new S3Object(); final S3Object object0 = new S3Object();
object0.setBucketName(bucket); object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz"); object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0)); object0.setLastModifiedDate(new Date(0));
object0.setDataInputStream(new FileInputStream(tmpFile)); object0.setDataInputStream(new FileInputStream(tmpFile));
File tmpDir = Files.createTempDirectory("gzTestDir").toFile(); final File tmpDir = temporaryFolder.newFolder("gzTestDir");
try { EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
EasyMock.expect(s3Client.getObjectDetails(EasyMock.<S3Bucket>anyObject(), EasyMock.eq(object0.getKey()))) .andReturn(null)
.andReturn(null) .once();
.once(); EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))).andReturn(object0).once(); .andReturn(object0)
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); .once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(object0)
.once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
EasyMock.replay(s3Client); EasyMock.replay(s3Client);
FileUtils.FileCopyResult result = puller.getSegmentFiles( FileUtils.FileCopyResult result = puller.getSegmentFiles(
new S3DataSegmentPuller.S3Coords( new S3DataSegmentPuller.S3Coords(
bucket, bucket,
object0.getKey() object0.getKey()
), tmpDir ), tmpDir
); );
EasyMock.verify(s3Client); EasyMock.verify(s3Client);
Assert.assertEquals(value.length, result.size()); Assert.assertEquals(value.length, result.size());
File expected = new File(tmpDir, "renames-0"); File expected = new File(tmpDir, "renames-0");
Assert.assertTrue(expected.exists()); Assert.assertTrue(expected.exists());
Assert.assertEquals(value.length, expected.length()); Assert.assertEquals(value.length, expected.length());
}
finally {
org.apache.commons.io.FileUtils.deleteDirectory(tmpDir);
}
} }
@Test @Test
public void testGZUncompressRetries() throws S3ServiceException, IOException, SegmentLoadingException public void testGZUncompressRetries() throws ServiceException, IOException, SegmentLoadingException
{ {
final String bucket = "bucket"; final String bucket = "bucket";
final String keyPrefix = "prefix/dir/0"; final String keyPrefix = "prefix/dir/0";
final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
final byte[] value = bucket.getBytes("utf8"); final byte[] value = bucket.getBytes("utf8");
final File tmpFile = Files.createTempFile("gzTest", ".gz").toFile(); final File tmpFile = temporaryFolder.newFile("gzTest.gz");
tmpFile.deleteOnExit();
try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) { try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) {
outputStream.write(value); outputStream.write(value);
} }
@ -139,39 +145,41 @@ public class S3DataSegmentPullerTest
object0.setLastModifiedDate(new Date(0)); object0.setLastModifiedDate(new Date(0));
object0.setDataInputStream(new FileInputStream(tmpFile)); object0.setDataInputStream(new FileInputStream(tmpFile));
File tmpDir = Files.createTempDirectory("gzTestDir").toFile(); File tmpDir = temporaryFolder.newFolder("gzTestDir");
S3ServiceException exception = new S3ServiceException(); S3ServiceException exception = new S3ServiceException();
exception.setErrorCode("NoSuchKey"); exception.setErrorCode("NoSuchKey");
exception.setResponseCode(404); exception.setResponseCode(404);
try { EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
EasyMock.expect(s3Client.getObjectDetails(EasyMock.<S3Bucket>anyObject(), EasyMock.eq(object0.getKey()))) .andReturn(null)
.andReturn(null) .once();
.once(); EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) .andReturn(object0)
.andThrow(exception) .once();
.once() EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andReturn(object0) .andThrow(exception)
.once(); .once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(object0)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andReturn(object0)
.once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
EasyMock.replay(s3Client); EasyMock.replay(s3Client);
FileUtils.FileCopyResult result = puller.getSegmentFiles( FileUtils.FileCopyResult result = puller.getSegmentFiles(
new S3DataSegmentPuller.S3Coords( new S3DataSegmentPuller.S3Coords(
bucket, bucket,
object0.getKey() object0.getKey()
), tmpDir ), tmpDir
); );
EasyMock.verify(s3Client); EasyMock.verify(s3Client);
Assert.assertEquals(value.length, result.size()); Assert.assertEquals(value.length, result.size());
File expected = new File(tmpDir, "renames-0"); File expected = new File(tmpDir, "renames-0");
Assert.assertTrue(expected.exists()); Assert.assertTrue(expected.exists());
Assert.assertEquals(value.length, expected.length()); Assert.assertEquals(value.length, expected.length());
}
finally {
org.apache.commons.io.FileUtils.deleteDirectory(tmpDir);
}
} }
} }