S3: Improvements to prefix listing (including fix for an infinite loop) (#9098)

* S3: Improvements to prefix listing (including fix for an infinite loop)

1) Fixes #9097, an infinite loop that occurs when more than one batch
   of objects is retrieved during a prefix listing.

2) Removes the Access Denied fallback code added in #4444. I don't think
   the behavior is reasonable: its purpose is to fall back from a prefix
   listing to a single-object access, but it's only activated when the
   end user supplied a prefix, so it would be better to simply fail, so
   the end user knows that their request for a prefix-based load is not
   going to work. Presumably the end user can switch from supplying
   'prefixes' to supplying 'uris' if desired.

3) Filters out directory placeholders when walking prefixes.

4) Splits LazyObjectSummariesIterator into its own class and adds tests.

* Adjust S3InputSourceTest.

* Changes from review.

* Include hamcrest-core.
This commit is contained in:
Gian Merlino 2019-12-31 19:06:49 -05:00 committed by GitHub
parent dec619ebf4
commit 18eb456fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 516 additions and 251 deletions

View File

@ -155,6 +155,11 @@
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -89,6 +89,6 @@ public class S3InputSource extends CloudObjectInputSource<S3Entity>
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, getPrefixes().iterator(), MAX_LISTING_LENGTH);
return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), MAX_LISTING_LENGTH);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.firehose.s3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.annotation.JacksonInject;
@ -29,13 +28,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.s3.S3Utils;
@ -112,53 +109,20 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
}
@Override
protected Collection<URI> initObjects() throws IOException
protected Collection<URI> initObjects()
{
if (!uris.isEmpty()) {
return uris;
} else {
final List<S3ObjectSummary> objects = new ArrayList<>();
for (URI uri : prefixes) {
final String bucket = uri.getAuthority();
final String prefix = S3Utils.extractS3Key(uri);
for (final URI prefix : prefixes) {
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
Collections.singletonList(prefix),
MAX_LISTING_LENGTH
);
try {
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
uri,
MAX_LISTING_LENGTH
);
objects.addAll(Lists.newArrayList(objectSummaryIterator));
}
catch (AmazonS3Exception outerException) {
log.error(outerException, "Exception while listing on %s", uri);
if (outerException.getStatusCode() == 403) {
// The "Access Denied" means users might not have a proper permission for listing on the given uri.
// Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes.
// In this case, users should be able to get objects if they have a proper permission for GetObject.
log.warn("Access denied for %s. Try to get the object from the uri without listing", uri);
try {
final ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, prefix);
if (!S3Utils.isDirectoryPlaceholder(prefix, objectMetadata)) {
objects.add(S3Utils.getSingleObjectSummary(s3Client, bucket, prefix));
} else {
throw new IOE(
"[%s] is a directory placeholder, "
+ "but failed to get the object list under the directory due to permission",
uri
);
}
}
catch (AmazonS3Exception innerException) {
throw new IOException(innerException);
}
} else {
throw new IOException(outerException);
}
}
objectSummaryIterator.forEachRemaining(objects::add);
}
return objects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList());
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.druid.java.util.common.RE;
import java.net.URI;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Iterator class used by {@link S3Utils#objectSummaryIterator}.
*
* As required by the specification of that method, this iterator is computed incrementally in batches of
* {@code maxListLength}. The first call is made at the same time the iterator is constructed.
*/
public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
{
private final ServerSideEncryptingAmazonS3 s3Client;
private final Iterator<URI> prefixesIterator;
private final int maxListingLength;
private ListObjectsV2Request request;
private ListObjectsV2Result result;
private Iterator<S3ObjectSummary> objectSummaryIterator;
private S3ObjectSummary currentObjectSummary;
ObjectSummaryIterator(
final ServerSideEncryptingAmazonS3 s3Client,
final Iterable<URI> prefixes,
final int maxListingLength
)
{
this.s3Client = s3Client;
this.prefixesIterator = prefixes.iterator();
this.maxListingLength = maxListingLength;
prepareNextRequest();
fetchNextBatch();
advanceObjectSummary();
}
@Override
public boolean hasNext()
{
return currentObjectSummary != null;
}
@Override
public S3ObjectSummary next()
{
if (currentObjectSummary == null) {
throw new NoSuchElementException();
}
final S3ObjectSummary retVal = currentObjectSummary;
advanceObjectSummary();
return retVal;
}
private void prepareNextRequest()
{
final URI currentUri = prefixesIterator.next();
final String currentBucket = currentUri.getAuthority();
final String currentPrefix = S3Utils.extractS3Key(currentUri);
request = new ListObjectsV2Request()
.withBucketName(currentBucket)
.withPrefix(currentPrefix)
.withMaxKeys(maxListingLength);
}
private void fetchNextBatch()
{
try {
result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request));
request.setContinuationToken(result.getNextContinuationToken());
objectSummaryIterator = result.getObjectSummaries().iterator();
}
catch (AmazonS3Exception e) {
throw new RE(
e,
"Failed to get object summaries from S3 bucket[%s], prefix[%s]; S3 error: %s",
request.getBucketName(),
request.getPrefix(),
e.getMessage()
);
}
catch (Exception e) {
throw new RE(
e,
"Failed to get object summaries from S3 bucket[%s], prefix[%s]",
request.getBucketName(),
request.getPrefix()
);
}
}
/**
* Advance objectSummaryIterator to the next non-placeholder, updating "currentObjectSummary".
*/
private void advanceObjectSummary()
{
while (objectSummaryIterator.hasNext() || result.isTruncated() || prefixesIterator.hasNext()) {
while (objectSummaryIterator.hasNext()) {
currentObjectSummary = objectSummaryIterator.next();
if (!isDirectoryPlaceholder(currentObjectSummary)) {
return;
}
}
// Exhausted "objectSummaryIterator" without finding a non-placeholder.
if (result.isTruncated()) {
fetchNextBatch();
} else if (prefixesIterator.hasNext()) {
prepareNextRequest();
fetchNextBatch();
}
}
// Truly nothing left to read.
currentObjectSummary = null;
}
/**
* Checks if a given object is a directory placeholder and should be ignored.
*
* Adapted from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder(). Does not include the check for
* legacy JetS3t directory placeholder objects, since it is based on content-type, which isn't available in an
* S3ObjectSummary.
*/
private static boolean isDirectoryPlaceholder(final S3ObjectSummary objectSummary)
{
// Recognize "standard" directory place-holder indications used by Amazon's AWS Console and Panic's Transmit.
if (objectSummary.getKey().endsWith("/") && objectSummary.getSize() == 0) {
return true;
}
// Recognize s3sync.rb directory placeholders by MD5/ETag value.
if ("d66759af42f282e1ba19144df2d405d0".equals(objectSummary.getETag())) {
return true;
}
// Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension.
if (objectSummary.getKey().endsWith("_$folder$") && objectSummary.getSize() == 0) {
return true;
}
return false;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.regex.Pattern;
@ -62,7 +63,7 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen
URI latest = null;
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
uri,
Collections.singletonList(uri),
MAX_LISTING_KEYS
);
while (objectSummaryIterator.hasNext()) {

View File

@ -26,16 +26,13 @@ import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
@ -45,7 +42,6 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
*
@ -54,7 +50,6 @@ public class S3Utils
{
private static final String SCHEME = S3StorageDruidModule.SCHEME;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory";
private static final Logger log = new Logger(S3Utils.class);
@ -111,142 +106,24 @@ public class S3Utils
}
}
/**
* Create an iterator over a set of S3 objects specified by a set of prefixes.
*
* For each provided prefix URI, the iterator will walk through all objects that are in the same bucket as the
* provided URI and whose keys start with that URI's path, except for directory placeholders (which will be
* ignored). The iterator is computed incrementally by calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for
* each prefix in batches of {@param maxListLength}. The first call is made at the same time the iterator is
* constructed.
*/
public static Iterator<S3ObjectSummary> objectSummaryIterator(
final ServerSideEncryptingAmazonS3 s3Client,
final URI prefix,
final int numMaxKeys
)
{
return lazyFetchingObjectSummariesIterator(s3Client, Iterators.singletonIterator(prefix), numMaxKeys);
}
/**
* Create an iterator over a set of s3 objects specified by a set of 'prefixes' which may be paths or individual
* objects, in order to get {@link S3ObjectSummary} for each discovered object. This iterator is computed lazily as it
* is iterated, calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for each prefix in batches of
* {@param maxListLength}, falling back to {@link ServerSideEncryptingAmazonS3#getObjectMetadata} if the list API
* returns a 403 status code as a fallback to check if the URI is a single object instead of a directory. These
* summaries are supplied to the outer iterator until drained, then if additional results for the current prefix are
* still available, it will continue fetching and repeat the process, else it will move on to the next prefix,
* continuing until all objects have been evaluated.
*/
public static Iterator<S3ObjectSummary> lazyFetchingObjectSummariesIterator(
final ServerSideEncryptingAmazonS3 s3Client,
final Iterator<URI> uris,
final Iterable<URI> prefixes,
final int maxListingLength
)
{
return new Iterator<S3ObjectSummary>()
{
private ListObjectsV2Request request;
private ListObjectsV2Result result;
private URI currentUri;
private String currentBucket;
private String currentPrefix;
private Iterator<S3ObjectSummary> objectSummaryIterator;
{
prepareNextRequest();
fetchNextBatch();
}
private void prepareNextRequest()
{
currentUri = uris.next();
currentBucket = currentUri.getAuthority();
currentPrefix = S3Utils.extractS3Key(currentUri);
request = new ListObjectsV2Request()
.withBucketName(currentBucket)
.withPrefix(currentPrefix)
.withMaxKeys(maxListingLength);
}
private void fetchNextBatch()
{
try {
result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request));
objectSummaryIterator = result.getObjectSummaries().iterator();
request.setContinuationToken(result.getContinuationToken());
}
catch (AmazonS3Exception outerException) {
log.error(outerException, "Exception while listing on %s", currentUri);
if (outerException.getStatusCode() == 403) {
// The "Access Denied" means users might not have a proper permission for listing on the given uri.
// Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes.
// In this case, users should be able to get objects if they have a proper permission for GetObject.
log.warn("Access denied for %s. Try to get the object from the uri without listing", currentUri);
try {
final ObjectMetadata objectMetadata =
S3Utils.retryS3Operation(() -> s3Client.getObjectMetadata(currentBucket, currentPrefix));
if (!S3Utils.isDirectoryPlaceholder(currentPrefix, objectMetadata)) {
// it's not a directory, so just generate an object summary
S3ObjectSummary fabricated = new S3ObjectSummary();
fabricated.setBucketName(currentBucket);
fabricated.setKey(currentPrefix);
objectSummaryIterator = Iterators.singletonIterator(fabricated);
} else {
throw new RE(
"[%s] is a directory placeholder, "
+ "but failed to get the object list under the directory due to permission",
currentUri
);
}
}
catch (Exception innerException) {
throw new RuntimeException(innerException);
}
} else {
throw new RuntimeException(outerException);
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean hasNext()
{
return objectSummaryIterator.hasNext() || result.isTruncated() || uris.hasNext();
}
@Override
public S3ObjectSummary next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
if (objectSummaryIterator.hasNext()) {
return objectSummaryIterator.next();
}
if (result.isTruncated()) {
fetchNextBatch();
} else if (uris.hasNext()) {
prepareNextRequest();
fetchNextBatch();
}
if (!objectSummaryIterator.hasNext()) {
throw new ISE(
"Failed to further iterate on bucket[%s] and prefix[%s]. The last continuationToken was [%s]",
currentBucket,
currentPrefix,
result.getContinuationToken()
);
}
return objectSummaryIterator.next();
}
};
return new ObjectSummaryIterator(s3Client, prefixes, maxListingLength);
}
/**
* Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below.
*
@ -292,34 +169,6 @@ public class S3Utils
return CloudObjectLocation.validateUriScheme(SCHEME, uri);
}
// Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder()
public static boolean isDirectoryPlaceholder(String key, ObjectMetadata objectMetadata)
{
// Recognize "standard" directory place-holder indications used by
// Amazon's AWS Console and Panic's Transmit.
if (key.endsWith("/") && objectMetadata.getContentLength() == 0) {
return true;
}
// Recognize s3sync.rb directory placeholders by MD5/ETag value.
if ("d66759af42f282e1ba19144df2d405d0".equals(objectMetadata.getETag())) {
return true;
}
// Recognize place-holder objects created by the Google Storage console
// or S3 Organizer Firefox extension.
if (key.endsWith("_$folder$") && objectMetadata.getContentLength() == 0) {
return true;
}
// We don't use JetS3t APIs anymore, but the below check is still needed for backward compatibility.
// Recognize legacy JetS3t directory place-holder objects, only gives
// accurate results if an object's metadata is populated.
if (objectMetadata.getContentLength() == 0 && MIMETYPE_JETS3T_DIRECTORY.equals(objectMetadata.getContentType())) {
return true;
}
return false;
}
/**
* Gets a single {@link S3ObjectSummary} from s3. Since this method might return a wrong object if there are multiple
* objects that match the given key, this method should be used only when it's guaranteed that the given key is unique
@ -354,10 +203,10 @@ public class S3Utils
/**
* Uploads a file to S3 if possible. First trying to set ACL to give the bucket owner full control of the file before uploading.
*
* @param service S3 client
* @param service S3 client
* @param disableAcl true if ACL shouldn't be set for the file
* @param key The key under which to store the new object.
* @param file The path of the file to upload to Amazon S3.
* @param key The key under which to store the new object.
* @param file The path of the file to upload to Amazon S3.
*/
static void uploadFileIfPossible(
ServerSideEncryptingAmazonS3 service,

View File

@ -21,13 +21,10 @@ package org.apache.druid.data.input.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonParser;
@ -62,10 +59,13 @@ import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
import org.easymock.IArgumentMatcher;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
@ -81,7 +81,7 @@ import java.util.stream.Stream;
public class S3InputSourceTest extends InitializedNullHandlingTest
{
private static final ObjectMapper MAPPER = createS3ObjectMapper();
private static final AmazonS3Client S3_CLIENT = EasyMock.createNiceMock(AmazonS3Client.class);
private static final AmazonS3Client S3_CLIENT = EasyMock.createMock(AmazonS3Client.class);
private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
S3_CLIENT,
new NoopServerSideEncryption()
@ -165,7 +165,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
}
@Test
public void testSerdeWithInvalidArgs() throws Exception
public void testSerdeWithInvalidArgs()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
@ -220,8 +220,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
public void testWithPrefixesSplit()
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(SERVICE, null, PREFIXES, null);
@ -232,14 +232,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(S3_CLIENT);
}
@Test
public void testWithPrefixesWhereOneIsUrisAndNoListPermissionSplit()
public void testAccessDeniedWhileListingPrefix()
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedNonPrefixObjectsWithNoListPermission();
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
expectListObjectsAndThrowAccessDenied(EXPECTED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
@ -249,22 +250,25 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null
);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
null
expectedException.expectMessage("Failed to get object summaries from S3 bucket[bar], prefix[foo/file2.csv]");
expectedException.expectCause(
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("can't list that bucket"))
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
null
).collect(Collectors.toList());
}
@Test
public void testReader() throws IOException
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedNonPrefixObjectsWithNoListPermission();
addExpectedGetObjectMock(EXPECTED_URIS.get(0));
addExpectedGetObjectMock(EXPECTED_URIS.get(1));
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
expectGetObject(EXPECTED_URIS.get(0));
expectGetObject(EXPECTED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
@ -294,16 +298,18 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
EasyMock.verify(S3_CLIENT);
}
@Test
public void testCompressedReader() throws IOException
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
addExpectedNonPrefixObjectsWithNoListPermission();
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0));
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1));
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)));
expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(0));
expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
@ -333,40 +339,39 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
EasyMock.verify(S3_CLIENT);
}
private static void addExpectedPrefixObjects(URI prefix, List<URI> uris)
private static void expectListObjects(URI prefix, List<URI> uris)
{
final String s3Bucket = prefix.getAuthority();
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setBucketName(s3Bucket);
result.setKeyCount(1);
result.setBucketName(prefix.getAuthority());
result.setKeyCount(uris.size());
for (URI uri : uris) {
final String bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(s3Bucket);
objectSummary.setBucketName(bucket);
objectSummary.setKey(key);
result.getObjectSummaries().add(objectSummary);
}
EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andReturn(result).once();
EasyMock.expect(
S3_CLIENT.listObjectsV2(matchListObjectsRequest(prefix))
).andReturn(result).once();
}
private static void addExpectedNonPrefixObjectsWithNoListPermission()
private static void expectListObjectsAndThrowAccessDenied(final URI prefix)
{
AmazonS3Exception boom = new AmazonS3Exception("oh dang, you can't list that bucket friend");
boom.setStatusCode(403);
EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andThrow(boom).once();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(CONTENT.length);
metadata.setContentEncoding("text/csv");
metadata.setHeader(Headers.ETAG, "some-totally-real-etag-base64-hash-i-guess");
EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject(GetObjectMetadataRequest.class)))
.andReturn(metadata)
.once();
EasyMock.expect(
S3_CLIENT.listObjectsV2(matchListObjectsRequest(prefix))
).andThrow(boom).once();
}
private static void addExpectedGetObjectMock(URI uri)
private static void expectGetObject(URI uri)
{
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
@ -378,7 +383,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
}
private static void addExpectedGetCompressedObjectMock(URI uri) throws IOException
private static void expectGetObjectCompressed(URI uri) throws IOException
{
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
@ -392,6 +397,35 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
}
private static ListObjectsV2Request matchListObjectsRequest(final URI prefixUri)
{
// Use an IArgumentMatcher to verify that the request has the correct bucket and prefix.
EasyMock.reportMatcher(
new IArgumentMatcher()
{
@Override
public boolean matches(Object argument)
{
if (!(argument instanceof ListObjectsV2Request)) {
return false;
}
final ListObjectsV2Request request = (ListObjectsV2Request) argument;
return prefixUri.getAuthority().equals(request.getBucketName())
&& S3Utils.extractS3Key(prefixUri).equals(request.getPrefix());
}
@Override
public void appendTo(StringBuffer buffer)
{
buffer.append("<request for prefix [").append(prefixUri).append("]>");
}
}
);
return null;
}
public static ObjectMapper createS3ObjectMapper()
{
DruidModule baseModule = new TestS3Module();

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class ObjectSummaryIteratorTest
{
private static final ImmutableList<S3ObjectSummary> TEST_OBJECTS =
ImmutableList.of(
makeObjectSummary("b", "foo", 10L),
makeObjectSummary("b", "foo/", 0L),
makeObjectSummary("b", "foo/bar1", 10L),
makeObjectSummary("b", "foo/bar2", 10L),
makeObjectSummary("b", "foo/bar3", 10L),
makeObjectSummary("b", "foo/bar4", 10L),
makeObjectSummary("b", "foo/baz", 10L),
makeObjectSummary("bucketnotmine", "a/different/bucket", 10L)
);
@Test
public void testSingleObject()
{
test(
ImmutableList.of("s3://b/foo/baz"),
ImmutableList.of("s3://b/foo/baz"),
5
);
}
@Test
public void testMultiObjectOneKeyAtATime()
{
test(
ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
ImmutableList.of("s3://b/foo/"),
1
);
}
@Test
public void testMultiObjectTwoKeysAtATime()
{
test(
ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
ImmutableList.of("s3://b/foo/"),
2
);
}
@Test
public void testMultiObjectTenKeysAtATime()
{
test(
ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
ImmutableList.of("s3://b/foo/"),
10
);
}
@Test
public void testPrefixInMiddleOfKey()
{
test(
ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4"),
ImmutableList.of("s3://b/foo/bar"),
10
);
}
@Test
public void testNoPath()
{
test(
ImmutableList.of(
"s3://b/foo",
"s3://b/foo/bar1",
"s3://b/foo/bar2",
"s3://b/foo/bar3",
"s3://b/foo/bar4",
"s3://b/foo/baz"
),
ImmutableList.of("s3://b"),
10
);
}
@Test
public void testSlashPath()
{
test(
ImmutableList.of(
"s3://b/foo",
"s3://b/foo/bar1",
"s3://b/foo/bar2",
"s3://b/foo/bar3",
"s3://b/foo/bar4",
"s3://b/foo/baz"
),
ImmutableList.of("s3://b/"),
10
);
}
@Test
public void testDifferentBucket()
{
test(
ImmutableList.of(),
ImmutableList.of("s3://bx/foo/"),
10
);
}
private static void test(
final List<String> expectedUris,
final List<String> prefixes,
final int maxListingLength
)
{
final List<S3ObjectSummary> expectedObjects = new ArrayList<>();
// O(N^2) but who cares -- the list is short.
for (final String uri : expectedUris) {
final List<S3ObjectSummary> matches = TEST_OBJECTS.stream()
.filter(
summary ->
S3Utils.summaryToUri(summary).toString().equals(uri)
)
.collect(Collectors.toList());
expectedObjects.add(Iterables.getOnlyElement(matches));
}
final List<S3ObjectSummary> actualObjects = ImmutableList.copyOf(
S3Utils.objectSummaryIterator(
makeMockClient(TEST_OBJECTS),
prefixes.stream().map(URI::create).collect(Collectors.toList()),
maxListingLength
)
);
Assert.assertEquals(
prefixes.toString(),
expectedObjects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList()),
actualObjects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList())
);
}
/**
* Makes a mock S3 client that handles enough of "listObjectsV2" to test the functionality of the
* {@link ObjectSummaryIterator} class.
*/
private static ServerSideEncryptingAmazonS3 makeMockClient(
final List<S3ObjectSummary> objects
)
{
return new ServerSideEncryptingAmazonS3(null, null)
{
@Override
public ListObjectsV2Result listObjectsV2(final ListObjectsV2Request request)
{
// Continuation token is an index in the "objects" list.
final String continuationToken = request.getContinuationToken();
final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken);
// Find matching objects.
final List<S3ObjectSummary> summaries = new ArrayList<>();
int nextIndex = -1;
for (int i = startIndex; i < objects.size(); i++) {
final S3ObjectSummary summary = objects.get(i);
if (summary.getBucketName().equals(request.getBucketName())
&& summary.getKey().startsWith(request.getPrefix())) {
if (summaries.size() == request.getMaxKeys()) {
// We reached our max key limit; set nextIndex (which will lead to a result with truncated = true).
nextIndex = i;
break;
}
// Generate a summary.
summaries.add(summary);
}
}
// Generate the result.
final ListObjectsV2Result retVal = new ListObjectsV2Result();
retVal.setContinuationToken(continuationToken);
retVal.getObjectSummaries().addAll(summaries);
if (nextIndex >= 0) {
retVal.setTruncated(true);
retVal.setNextContinuationToken(String.valueOf(nextIndex));
}
return retVal;
}
};
}
private static S3ObjectSummary makeObjectSummary(final String bucket, final String key, final long size)
{
final S3ObjectSummary summary = new S3ObjectSummary();
summary.setBucketName(bucket);
summary.setKey(key);
summary.setSize(size);
return summary;
}
}