mirror of https://github.com/apache/druid.git
Reduce list operation calls when pulling segments from S3 (#11899)
* Lazy lists * Fix objectsummary init
This commit is contained in:
parent
5baa22148e
commit
f9941c12c3
|
@ -71,7 +71,8 @@ public class S3DataSegmentPuller implements URIDataPuller
|
||||||
this.s3Client = s3Client;
|
this.s3Client = s3Client;
|
||||||
}
|
}
|
||||||
|
|
||||||
FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, final File outDir) throws SegmentLoadingException
|
FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, final File outDir)
|
||||||
|
throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
|
|
||||||
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
|
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
|
||||||
|
@ -149,16 +150,15 @@ public class S3DataSegmentPuller implements URIDataPuller
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileObject buildFileObject(final URI uri) throws AmazonServiceException
|
public FileObject buildFileObject(final URI uri) throws AmazonServiceException
|
||||||
{
|
{
|
||||||
final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri));
|
final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri));
|
||||||
final S3ObjectSummary objectSummary =
|
|
||||||
S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath());
|
|
||||||
final String path = uri.getPath();
|
final String path = uri.getPath();
|
||||||
|
|
||||||
return new FileObject()
|
return new FileObject()
|
||||||
{
|
{
|
||||||
S3Object s3Object = null;
|
S3Object s3Object = null;
|
||||||
|
S3ObjectSummary objectSummary = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public URI toUri()
|
public URI toUri()
|
||||||
|
@ -182,7 +182,7 @@ public class S3DataSegmentPuller implements URIDataPuller
|
||||||
try {
|
try {
|
||||||
if (s3Object == null) {
|
if (s3Object == null) {
|
||||||
// lazily promote to full GET
|
// lazily promote to full GET
|
||||||
s3Object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
|
s3Object = s3Client.getObject(coords.getBucket(), coords.getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
final InputStream in = s3Object.getObjectContent();
|
final InputStream in = s3Object.getObjectContent();
|
||||||
|
@ -231,6 +231,13 @@ public class S3DataSegmentPuller implements URIDataPuller
|
||||||
@Override
|
@Override
|
||||||
public long getLastModified()
|
public long getLastModified()
|
||||||
{
|
{
|
||||||
|
if (s3Object != null) {
|
||||||
|
return s3Object.getObjectMetadata().getLastModified().getTime();
|
||||||
|
}
|
||||||
|
if (objectSummary == null) {
|
||||||
|
objectSummary =
|
||||||
|
S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath());
|
||||||
|
}
|
||||||
return objectSummary.getLastModified().getTime();
|
return objectSummary.getLastModified().getTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,9 +259,7 @@ public class S3DataSegmentPuller implements URIDataPuller
|
||||||
* Returns the "version" (aka last modified timestamp) of the URI
|
* Returns the "version" (aka last modified timestamp) of the URI
|
||||||
*
|
*
|
||||||
* @param uri The URI to check the last timestamp
|
* @param uri The URI to check the last timestamp
|
||||||
*
|
|
||||||
* @return The time in ms of the last modification of the URI in String format
|
* @return The time in ms of the last modification of the URI in String format
|
||||||
*
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||||
import com.amazonaws.services.s3.model.S3Object;
|
import com.amazonaws.services.s3.model.S3Object;
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||||
import org.apache.druid.java.util.common.FileUtils;
|
import org.apache.druid.java.util.common.FileUtils;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
@ -38,6 +39,7 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -107,18 +109,11 @@ public class S3DataSegmentPullerTest
|
||||||
objectSummary.setKey(keyPrefix + "/renames-0.gz");
|
objectSummary.setKey(keyPrefix + "/renames-0.gz");
|
||||||
objectSummary.setLastModified(new Date(0));
|
objectSummary.setLastModified(new Date(0));
|
||||||
|
|
||||||
final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
|
|
||||||
listObjectsResult.setKeyCount(1);
|
|
||||||
listObjectsResult.getObjectSummaries().add(objectSummary);
|
|
||||||
|
|
||||||
final File tmpDir = temporaryFolder.newFolder("gzTestDir");
|
final File tmpDir = temporaryFolder.newFolder("gzTestDir");
|
||||||
|
|
||||||
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
||||||
.andReturn(true)
|
.andReturn(true)
|
||||||
.once();
|
.once();
|
||||||
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
|
|
||||||
.andReturn(listObjectsResult)
|
|
||||||
.once();
|
|
||||||
EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
||||||
.andReturn(object0)
|
.andReturn(object0)
|
||||||
.once();
|
.once();
|
||||||
|
@ -177,15 +172,9 @@ public class S3DataSegmentPullerTest
|
||||||
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
||||||
.andReturn(true)
|
.andReturn(true)
|
||||||
.once();
|
.once();
|
||||||
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
|
|
||||||
.andReturn(listObjectsResult)
|
|
||||||
.once();
|
|
||||||
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
|
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
|
||||||
.andThrow(exception)
|
.andThrow(exception)
|
||||||
.once();
|
.once();
|
||||||
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
|
|
||||||
.andReturn(listObjectsResult)
|
|
||||||
.once();
|
|
||||||
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
|
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
|
||||||
.andReturn(object0)
|
.andReturn(object0)
|
||||||
.once();
|
.once();
|
||||||
|
@ -206,4 +195,75 @@ public class S3DataSegmentPullerTest
|
||||||
Assert.assertEquals(value.length, expected.length());
|
Assert.assertEquals(value.length, expected.length());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testS3ObjectStream() throws IOException
|
||||||
|
{
|
||||||
|
final String bucket = "bucket";
|
||||||
|
final String keyPrefix = "prefix/dir/0";
|
||||||
|
final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||||
|
final byte[] value = bucket.getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final File tmpFile = temporaryFolder.newFile("testObjectFile");
|
||||||
|
|
||||||
|
try (OutputStream outputStream = new FileOutputStream(tmpFile)) {
|
||||||
|
outputStream.write(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
final S3Object object0 = new S3Object();
|
||||||
|
object0.setBucketName(bucket);
|
||||||
|
object0.setKey(keyPrefix + "/test-object");
|
||||||
|
object0.getObjectMetadata().setLastModified(new Date(0));
|
||||||
|
object0.setObjectContent(new FileInputStream(tmpFile));
|
||||||
|
|
||||||
|
EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
||||||
|
.andReturn(object0)
|
||||||
|
.once();
|
||||||
|
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
|
||||||
|
EasyMock.replay(s3Client);
|
||||||
|
InputStream stream = puller.buildFileObject(URI.create(StringUtils.format("s3://%s/%s", bucket, object0.getKey())))
|
||||||
|
.openInputStream();
|
||||||
|
EasyMock.verify(s3Client);
|
||||||
|
Assert.assertEquals(bucket, IOUtils.toString(stream, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testS3ObjectModifiedDate() throws IOException
|
||||||
|
{
|
||||||
|
final String bucket = "bucket";
|
||||||
|
final String keyPrefix = "prefix/dir/0";
|
||||||
|
final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||||
|
final byte[] value = bucket.getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final File tmpFile = temporaryFolder.newFile("testObjectFile");
|
||||||
|
|
||||||
|
try (OutputStream outputStream = new FileOutputStream(tmpFile)) {
|
||||||
|
outputStream.write(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
final S3Object object0 = new S3Object();
|
||||||
|
object0.setBucketName(bucket);
|
||||||
|
object0.setKey(keyPrefix + "/test-object");
|
||||||
|
object0.getObjectMetadata().setLastModified(new Date(0));
|
||||||
|
object0.setObjectContent(new FileInputStream(tmpFile));
|
||||||
|
|
||||||
|
final S3ObjectSummary objectSummary = new S3ObjectSummary();
|
||||||
|
objectSummary.setBucketName(bucket);
|
||||||
|
objectSummary.setKey(keyPrefix + "/test-object");
|
||||||
|
objectSummary.setLastModified(new Date(0));
|
||||||
|
|
||||||
|
final ListObjectsV2Result result = new ListObjectsV2Result();
|
||||||
|
result.setKeyCount(1);
|
||||||
|
result.getObjectSummaries().add(objectSummary);
|
||||||
|
|
||||||
|
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
|
||||||
|
.andReturn(result)
|
||||||
|
.once();
|
||||||
|
|
||||||
|
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
|
||||||
|
EasyMock.replay(s3Client);
|
||||||
|
long modifiedDate = puller.buildFileObject(URI.create(StringUtils.format("s3://%s/%s", bucket, object0.getKey())))
|
||||||
|
.getLastModified();
|
||||||
|
EasyMock.verify(s3Client);
|
||||||
|
Assert.assertEquals(0, modifiedDate);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue