diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml
index ebbfed0e20a..9c6730270f1 100644
--- a/extensions-core/s3-extensions/pom.xml
+++ b/extensions-core/s3-extensions/pom.xml
@@ -155,6 +155,11 @@
joda-time
test
+
+ org.hamcrest
+ hamcrest-core
+ test
+
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 9642ebec108..a975a9e2e92 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -89,6 +89,6 @@ public class S3InputSource extends CloudObjectInputSource
private Iterable getIterableObjectsFromPrefixes()
{
- return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, getPrefixes().iterator(), MAX_LISTING_LENGTH);
+ return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), MAX_LISTING_LENGTH);
}
}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java
index e202876f8ba..0bf93313dc6 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java
@@ -21,7 +21,6 @@ package org.apache.druid.firehose.s3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -29,13 +28,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.collect.Lists;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.s3.S3Utils;
@@ -112,53 +109,20 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
}
@Override
- protected Collection initObjects() throws IOException
+ protected Collection initObjects()
{
if (!uris.isEmpty()) {
return uris;
} else {
final List objects = new ArrayList<>();
- for (URI uri : prefixes) {
- final String bucket = uri.getAuthority();
- final String prefix = S3Utils.extractS3Key(uri);
+ for (final URI prefix : prefixes) {
+ final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator(
+ s3Client,
+ Collections.singletonList(prefix),
+ MAX_LISTING_LENGTH
+ );
- try {
- final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator(
- s3Client,
- uri,
- MAX_LISTING_LENGTH
- );
- objects.addAll(Lists.newArrayList(objectSummaryIterator));
- }
- catch (AmazonS3Exception outerException) {
- log.error(outerException, "Exception while listing on %s", uri);
-
- if (outerException.getStatusCode() == 403) {
- // The "Access Denied" means users might not have a proper permission for listing on the given uri.
- // Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes.
- // In this case, users should be able to get objects if they have a proper permission for GetObject.
-
- log.warn("Access denied for %s. Try to get the object from the uri without listing", uri);
- try {
- final ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, prefix);
-
- if (!S3Utils.isDirectoryPlaceholder(prefix, objectMetadata)) {
- objects.add(S3Utils.getSingleObjectSummary(s3Client, bucket, prefix));
- } else {
- throw new IOE(
- "[%s] is a directory placeholder, "
- + "but failed to get the object list under the directory due to permission",
- uri
- );
- }
- }
- catch (AmazonS3Exception innerException) {
- throw new IOException(innerException);
- }
- } else {
- throw new IOException(outerException);
- }
- }
+ objectSummaryIterator.forEachRemaining(objects::add);
}
return objects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList());
}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
new file mode 100644
index 00000000000..0e791cd40e5
--- /dev/null
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.druid.java.util.common.RE;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator class used by {@link S3Utils#objectSummaryIterator}.
+ *
+ * As required by the specification of that method, this iterator is computed incrementally in batches of
+ * {@code maxListLength}. The first call is made at the same time the iterator is constructed.
+ */
+public class ObjectSummaryIterator implements Iterator
+{
+ private final ServerSideEncryptingAmazonS3 s3Client;
+ private final Iterator prefixesIterator;
+ private final int maxListingLength;
+
+ private ListObjectsV2Request request;
+ private ListObjectsV2Result result;
+ private Iterator objectSummaryIterator;
+ private S3ObjectSummary currentObjectSummary;
+
+ ObjectSummaryIterator(
+ final ServerSideEncryptingAmazonS3 s3Client,
+ final Iterable prefixes,
+ final int maxListingLength
+ )
+ {
+ this.s3Client = s3Client;
+ this.prefixesIterator = prefixes.iterator();
+ this.maxListingLength = maxListingLength;
+
+ prepareNextRequest();
+ fetchNextBatch();
+ advanceObjectSummary();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentObjectSummary != null;
+ }
+
+ @Override
+ public S3ObjectSummary next()
+ {
+ if (currentObjectSummary == null) {
+ throw new NoSuchElementException();
+ }
+
+ final S3ObjectSummary retVal = currentObjectSummary;
+ advanceObjectSummary();
+ return retVal;
+ }
+
+ private void prepareNextRequest()
+ {
+ final URI currentUri = prefixesIterator.next();
+ final String currentBucket = currentUri.getAuthority();
+ final String currentPrefix = S3Utils.extractS3Key(currentUri);
+
+ request = new ListObjectsV2Request()
+ .withBucketName(currentBucket)
+ .withPrefix(currentPrefix)
+ .withMaxKeys(maxListingLength);
+ }
+
+ private void fetchNextBatch()
+ {
+ try {
+ result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request));
+ request.setContinuationToken(result.getNextContinuationToken());
+ objectSummaryIterator = result.getObjectSummaries().iterator();
+ }
+ catch (AmazonS3Exception e) {
+ throw new RE(
+ e,
+ "Failed to get object summaries from S3 bucket[%s], prefix[%s]; S3 error: %s",
+ request.getBucketName(),
+ request.getPrefix(),
+ e.getMessage()
+ );
+ }
+ catch (Exception e) {
+ throw new RE(
+ e,
+ "Failed to get object summaries from S3 bucket[%s], prefix[%s]",
+ request.getBucketName(),
+ request.getPrefix()
+ );
+ }
+ }
+
+ /**
+ * Advance objectSummaryIterator to the next non-placeholder, updating "currentObjectSummary".
+ */
+ private void advanceObjectSummary()
+ {
+ while (objectSummaryIterator.hasNext() || result.isTruncated() || prefixesIterator.hasNext()) {
+ while (objectSummaryIterator.hasNext()) {
+ currentObjectSummary = objectSummaryIterator.next();
+
+ if (!isDirectoryPlaceholder(currentObjectSummary)) {
+ return;
+ }
+ }
+
+ // Exhausted "objectSummaryIterator" without finding a non-placeholder.
+ if (result.isTruncated()) {
+ fetchNextBatch();
+ } else if (prefixesIterator.hasNext()) {
+ prepareNextRequest();
+ fetchNextBatch();
+ }
+ }
+
+ // Truly nothing left to read.
+ currentObjectSummary = null;
+ }
+
+ /**
+ * Checks if a given object is a directory placeholder and should be ignored.
+ *
+ * Adapted from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder(). Does not include the check for
+ * legacy JetS3t directory placeholder objects, since it is based on content-type, which isn't available in an
+ * S3ObjectSummary.
+ */
+ private static boolean isDirectoryPlaceholder(final S3ObjectSummary objectSummary)
+ {
+ // Recognize "standard" directory place-holder indications used by Amazon's AWS Console and Panic's Transmit.
+ if (objectSummary.getKey().endsWith("/") && objectSummary.getSize() == 0) {
+ return true;
+ }
+
+ // Recognize s3sync.rb directory placeholders by MD5/ETag value.
+ if ("d66759af42f282e1ba19144df2d405d0".equals(objectSummary.getETag())) {
+ return true;
+ }
+
+ // Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension.
+ if (objectSummary.getKey().endsWith("_$folder$") && objectSummary.getSize() == 0) {
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java
index 075d94bb464..b7f3b6e84cf 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.net.URI;
+import java.util.Collections;
import java.util.Iterator;
import java.util.regex.Pattern;
@@ -62,7 +63,7 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen
URI latest = null;
final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
- uri,
+ Collections.singletonList(uri),
MAX_LISTING_KEYS
);
while (objectSummaryIterator.hasNext()) {
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index ae736770918..eb704ed819e 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -26,16 +26,13 @@ import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
-import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
@@ -45,7 +42,6 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
-import java.util.NoSuchElementException;
/**
*
@@ -54,7 +50,6 @@ public class S3Utils
{
private static final String SCHEME = S3StorageDruidModule.SCHEME;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
- private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory";
private static final Logger log = new Logger(S3Utils.class);
@@ -111,142 +106,24 @@ public class S3Utils
}
}
+ /**
+ * Create an iterator over a set of S3 objects specified by a set of prefixes.
+ *
+ * For each provided prefix URI, the iterator will walk through all objects that are in the same bucket as the
+ * provided URI and whose keys start with that URI's path, except for directory placeholders (which will be
+ * ignored). The iterator is computed incrementally by calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for
+ * each prefix in batches of {@param maxListLength}. The first call is made at the same time the iterator is
+ * constructed.
+ */
public static Iterator objectSummaryIterator(
final ServerSideEncryptingAmazonS3 s3Client,
- final URI prefix,
- final int numMaxKeys
- )
- {
- return lazyFetchingObjectSummariesIterator(s3Client, Iterators.singletonIterator(prefix), numMaxKeys);
- }
-
- /**
- * Create an iterator over a set of s3 objects specified by a set of 'prefixes' which may be paths or individual
- * objects, in order to get {@link S3ObjectSummary} for each discovered object. This iterator is computed lazily as it
- * is iterated, calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for each prefix in batches of
- * {@param maxListLength}, falling back to {@link ServerSideEncryptingAmazonS3#getObjectMetadata} if the list API
- * returns a 403 status code as a fallback to check if the URI is a single object instead of a directory. These
- * summaries are supplied to the outer iterator until drained, then if additional results for the current prefix are
- * still available, it will continue fetching and repeat the process, else it will move on to the next prefix,
- * continuing until all objects have been evaluated.
- */
- public static Iterator lazyFetchingObjectSummariesIterator(
- final ServerSideEncryptingAmazonS3 s3Client,
- final Iterator uris,
+ final Iterable prefixes,
final int maxListingLength
)
{
- return new Iterator()
- {
- private ListObjectsV2Request request;
- private ListObjectsV2Result result;
- private URI currentUri;
- private String currentBucket;
- private String currentPrefix;
- private Iterator objectSummaryIterator;
-
- {
- prepareNextRequest();
- fetchNextBatch();
- }
-
- private void prepareNextRequest()
- {
- currentUri = uris.next();
- currentBucket = currentUri.getAuthority();
- currentPrefix = S3Utils.extractS3Key(currentUri);
-
- request = new ListObjectsV2Request()
- .withBucketName(currentBucket)
- .withPrefix(currentPrefix)
- .withMaxKeys(maxListingLength);
- }
-
- private void fetchNextBatch()
- {
- try {
- result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request));
- objectSummaryIterator = result.getObjectSummaries().iterator();
- request.setContinuationToken(result.getContinuationToken());
- }
- catch (AmazonS3Exception outerException) {
- log.error(outerException, "Exception while listing on %s", currentUri);
-
- if (outerException.getStatusCode() == 403) {
- // The "Access Denied" means users might not have a proper permission for listing on the given uri.
- // Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes.
- // In this case, users should be able to get objects if they have a proper permission for GetObject.
-
- log.warn("Access denied for %s. Try to get the object from the uri without listing", currentUri);
- try {
- final ObjectMetadata objectMetadata =
- S3Utils.retryS3Operation(() -> s3Client.getObjectMetadata(currentBucket, currentPrefix));
-
- if (!S3Utils.isDirectoryPlaceholder(currentPrefix, objectMetadata)) {
- // it's not a directory, so just generate an object summary
- S3ObjectSummary fabricated = new S3ObjectSummary();
- fabricated.setBucketName(currentBucket);
- fabricated.setKey(currentPrefix);
- objectSummaryIterator = Iterators.singletonIterator(fabricated);
- } else {
- throw new RE(
- "[%s] is a directory placeholder, "
- + "but failed to get the object list under the directory due to permission",
- currentUri
- );
- }
- }
- catch (Exception innerException) {
- throw new RuntimeException(innerException);
- }
- } else {
- throw new RuntimeException(outerException);
- }
- }
- catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public boolean hasNext()
- {
- return objectSummaryIterator.hasNext() || result.isTruncated() || uris.hasNext();
- }
-
- @Override
- public S3ObjectSummary next()
- {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- if (objectSummaryIterator.hasNext()) {
- return objectSummaryIterator.next();
- }
-
- if (result.isTruncated()) {
- fetchNextBatch();
- } else if (uris.hasNext()) {
- prepareNextRequest();
- fetchNextBatch();
- }
-
- if (!objectSummaryIterator.hasNext()) {
- throw new ISE(
- "Failed to further iterate on bucket[%s] and prefix[%s]. The last continuationToken was [%s]",
- currentBucket,
- currentPrefix,
- result.getContinuationToken()
- );
- }
-
- return objectSummaryIterator.next();
- }
- };
+ return new ObjectSummaryIterator(s3Client, prefixes, maxListingLength);
}
-
/**
* Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below.
*
@@ -292,34 +169,6 @@ public class S3Utils
return CloudObjectLocation.validateUriScheme(SCHEME, uri);
}
- // Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder()
- public static boolean isDirectoryPlaceholder(String key, ObjectMetadata objectMetadata)
- {
- // Recognize "standard" directory place-holder indications used by
- // Amazon's AWS Console and Panic's Transmit.
- if (key.endsWith("/") && objectMetadata.getContentLength() == 0) {
- return true;
- }
- // Recognize s3sync.rb directory placeholders by MD5/ETag value.
- if ("d66759af42f282e1ba19144df2d405d0".equals(objectMetadata.getETag())) {
- return true;
- }
- // Recognize place-holder objects created by the Google Storage console
- // or S3 Organizer Firefox extension.
- if (key.endsWith("_$folder$") && objectMetadata.getContentLength() == 0) {
- return true;
- }
-
- // We don't use JetS3t APIs anymore, but the below check is still needed for backward compatibility.
-
- // Recognize legacy JetS3t directory place-holder objects, only gives
- // accurate results if an object's metadata is populated.
- if (objectMetadata.getContentLength() == 0 && MIMETYPE_JETS3T_DIRECTORY.equals(objectMetadata.getContentType())) {
- return true;
- }
- return false;
- }
-
/**
* Gets a single {@link S3ObjectSummary} from s3. Since this method might return a wrong object if there are multiple
* objects that match the given key, this method should be used only when it's guaranteed that the given key is unique
@@ -354,10 +203,10 @@ public class S3Utils
/**
* Uploads a file to S3 if possible. First trying to set ACL to give the bucket owner full control of the file before uploading.
*
- * @param service S3 client
+ * @param service S3 client
* @param disableAcl true if ACL shouldn't be set for the file
- * @param key The key under which to store the new object.
- * @param file The path of the file to upload to Amazon S3.
+ * @param key The key under which to store the new object.
+ * @param file The path of the file to upload to Amazon S3.
*/
static void uploadFileIfPossible(
ServerSideEncryptingAmazonS3 service,
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 1c1bc019912..4cfca195d59 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -21,13 +21,10 @@ package org.apache.druid.data.input.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
-import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonParser;
@@ -62,10 +59,13 @@ import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
@@ -81,7 +81,7 @@ import java.util.stream.Stream;
public class S3InputSourceTest extends InitializedNullHandlingTest
{
private static final ObjectMapper MAPPER = createS3ObjectMapper();
- private static final AmazonS3Client S3_CLIENT = EasyMock.createNiceMock(AmazonS3Client.class);
+ private static final AmazonS3Client S3_CLIENT = EasyMock.createMock(AmazonS3Client.class);
private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
S3_CLIENT,
new NoopServerSideEncryption()
@@ -165,7 +165,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
}
@Test
- public void testSerdeWithInvalidArgs() throws Exception
+ public void testSerdeWithInvalidArgs()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
@@ -220,8 +220,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
public void testWithPrefixesSplit()
{
EasyMock.reset(S3_CLIENT);
- addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
- addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
+ expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(SERVICE, null, PREFIXES, null);
@@ -232,14 +232,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
+ EasyMock.verify(S3_CLIENT);
}
@Test
- public void testWithPrefixesWhereOneIsUrisAndNoListPermissionSplit()
+ public void testAccessDeniedWhileListingPrefix()
{
EasyMock.reset(S3_CLIENT);
- addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
- addExpectedNonPrefixObjectsWithNoListPermission();
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
+ expectListObjectsAndThrowAccessDenied(EXPECTED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
@@ -249,22 +250,25 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null
);
- Stream> splits = inputSource.createSplits(
- new JsonInputFormat(JSONPathSpec.DEFAULT, null),
- null
+ expectedException.expectMessage("Failed to get object summaries from S3 bucket[bar], prefix[foo/file2.csv]");
+ expectedException.expectCause(
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("can't list that bucket"))
);
- Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
+ inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null),
+ null
+ ).collect(Collectors.toList());
}
@Test
public void testReader() throws IOException
{
EasyMock.reset(S3_CLIENT);
- addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
- addExpectedNonPrefixObjectsWithNoListPermission();
- addExpectedGetObjectMock(EXPECTED_URIS.get(0));
- addExpectedGetObjectMock(EXPECTED_URIS.get(1));
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
+ expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
+ expectGetObject(EXPECTED_URIS.get(0));
+ expectGetObject(EXPECTED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
@@ -294,16 +298,18 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
+
+ EasyMock.verify(S3_CLIENT);
}
@Test
public void testCompressedReader() throws IOException
{
EasyMock.reset(S3_CLIENT);
- addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
- addExpectedNonPrefixObjectsWithNoListPermission();
- addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0));
- addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1));
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
+ expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)));
+ expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(0));
+ expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
@@ -333,40 +339,39 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
+
+ EasyMock.verify(S3_CLIENT);
}
- private static void addExpectedPrefixObjects(URI prefix, List uris)
+ private static void expectListObjects(URI prefix, List uris)
{
- final String s3Bucket = prefix.getAuthority();
final ListObjectsV2Result result = new ListObjectsV2Result();
- result.setBucketName(s3Bucket);
- result.setKeyCount(1);
+ result.setBucketName(prefix.getAuthority());
+ result.setKeyCount(uris.size());
for (URI uri : uris) {
+ final String bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
final S3ObjectSummary objectSummary = new S3ObjectSummary();
- objectSummary.setBucketName(s3Bucket);
+ objectSummary.setBucketName(bucket);
objectSummary.setKey(key);
result.getObjectSummaries().add(objectSummary);
}
- EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andReturn(result).once();
+
+ EasyMock.expect(
+ S3_CLIENT.listObjectsV2(matchListObjectsRequest(prefix))
+ ).andReturn(result).once();
}
- private static void addExpectedNonPrefixObjectsWithNoListPermission()
+ private static void expectListObjectsAndThrowAccessDenied(final URI prefix)
{
AmazonS3Exception boom = new AmazonS3Exception("oh dang, you can't list that bucket friend");
boom.setStatusCode(403);
- EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andThrow(boom).once();
-
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(CONTENT.length);
- metadata.setContentEncoding("text/csv");
- metadata.setHeader(Headers.ETAG, "some-totally-real-etag-base64-hash-i-guess");
- EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject(GetObjectMetadataRequest.class)))
- .andReturn(metadata)
- .once();
+ EasyMock.expect(
+ S3_CLIENT.listObjectsV2(matchListObjectsRequest(prefix))
+ ).andThrow(boom).once();
}
- private static void addExpectedGetObjectMock(URI uri)
+ private static void expectGetObject(URI uri)
{
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
@@ -378,7 +383,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
}
- private static void addExpectedGetCompressedObjectMock(URI uri) throws IOException
+ private static void expectGetObjectCompressed(URI uri) throws IOException
{
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
@@ -392,6 +397,35 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
}
+ private static ListObjectsV2Request matchListObjectsRequest(final URI prefixUri)
+ {
+ // Use an IArgumentMatcher to verify that the request has the correct bucket and prefix.
+ EasyMock.reportMatcher(
+ new IArgumentMatcher()
+ {
+ @Override
+ public boolean matches(Object argument)
+ {
+ if (!(argument instanceof ListObjectsV2Request)) {
+ return false;
+ }
+
+ final ListObjectsV2Request request = (ListObjectsV2Request) argument;
+ return prefixUri.getAuthority().equals(request.getBucketName())
+ && S3Utils.extractS3Key(prefixUri).equals(request.getPrefix());
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer)
+ {
+ buffer.append("");
+ }
+ }
+ );
+
+ return null;
+ }
+
public static ObjectMapper createS3ObjectMapper()
{
DruidModule baseModule = new TestS3Module();
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java
new file mode 100644
index 00000000000..d63409cee19
--- /dev/null
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ObjectSummaryIteratorTest
+{
+ private static final ImmutableList TEST_OBJECTS =
+ ImmutableList.of(
+ makeObjectSummary("b", "foo", 10L),
+ makeObjectSummary("b", "foo/", 0L),
+ makeObjectSummary("b", "foo/bar1", 10L),
+ makeObjectSummary("b", "foo/bar2", 10L),
+ makeObjectSummary("b", "foo/bar3", 10L),
+ makeObjectSummary("b", "foo/bar4", 10L),
+ makeObjectSummary("b", "foo/baz", 10L),
+ makeObjectSummary("bucketnotmine", "a/different/bucket", 10L)
+ );
+
+ @Test
+ public void testSingleObject()
+ {
+ test(
+ ImmutableList.of("s3://b/foo/baz"),
+ ImmutableList.of("s3://b/foo/baz"),
+ 5
+ );
+ }
+
+ @Test
+ public void testMultiObjectOneKeyAtATime()
+ {
+ test(
+ ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
+ ImmutableList.of("s3://b/foo/"),
+ 1
+ );
+ }
+
+ @Test
+ public void testMultiObjectTwoKeysAtATime()
+ {
+ test(
+ ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
+ ImmutableList.of("s3://b/foo/"),
+ 2
+ );
+ }
+
+ @Test
+ public void testMultiObjectTenKeysAtATime()
+ {
+ test(
+ ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
+ ImmutableList.of("s3://b/foo/"),
+ 10
+ );
+ }
+
+ @Test
+ public void testPrefixInMiddleOfKey()
+ {
+ test(
+ ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4"),
+ ImmutableList.of("s3://b/foo/bar"),
+ 10
+ );
+ }
+
+ @Test
+ public void testNoPath()
+ {
+ test(
+ ImmutableList.of(
+ "s3://b/foo",
+ "s3://b/foo/bar1",
+ "s3://b/foo/bar2",
+ "s3://b/foo/bar3",
+ "s3://b/foo/bar4",
+ "s3://b/foo/baz"
+ ),
+ ImmutableList.of("s3://b"),
+ 10
+ );
+ }
+
+ @Test
+ public void testSlashPath()
+ {
+ test(
+ ImmutableList.of(
+ "s3://b/foo",
+ "s3://b/foo/bar1",
+ "s3://b/foo/bar2",
+ "s3://b/foo/bar3",
+ "s3://b/foo/bar4",
+ "s3://b/foo/baz"
+ ),
+ ImmutableList.of("s3://b/"),
+ 10
+ );
+ }
+
+ @Test
+ public void testDifferentBucket()
+ {
+ test(
+ ImmutableList.of(),
+ ImmutableList.of("s3://bx/foo/"),
+ 10
+ );
+ }
+
+ private static void test(
+ final List expectedUris,
+ final List prefixes,
+ final int maxListingLength
+ )
+ {
+ final List expectedObjects = new ArrayList<>();
+
+ // O(N^2) but who cares -- the list is short.
+ for (final String uri : expectedUris) {
+ final List matches = TEST_OBJECTS.stream()
+ .filter(
+ summary ->
+ S3Utils.summaryToUri(summary).toString().equals(uri)
+ )
+ .collect(Collectors.toList());
+
+ expectedObjects.add(Iterables.getOnlyElement(matches));
+ }
+
+ final List actualObjects = ImmutableList.copyOf(
+ S3Utils.objectSummaryIterator(
+ makeMockClient(TEST_OBJECTS),
+ prefixes.stream().map(URI::create).collect(Collectors.toList()),
+ maxListingLength
+ )
+ );
+
+ Assert.assertEquals(
+ prefixes.toString(),
+ expectedObjects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList()),
+ actualObjects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList())
+ );
+ }
+
+ /**
+ * Makes a mock S3 client that handles enough of "listObjectsV2" to test the functionality of the
+ * {@link ObjectSummaryIterator} class.
+ */
+ private static ServerSideEncryptingAmazonS3 makeMockClient(
+ final List objects
+ )
+ {
+ return new ServerSideEncryptingAmazonS3(null, null)
+ {
+ @Override
+ public ListObjectsV2Result listObjectsV2(final ListObjectsV2Request request)
+ {
+ // Continuation token is an index in the "objects" list.
+ final String continuationToken = request.getContinuationToken();
+ final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken);
+
+ // Find matching objects.
+ final List summaries = new ArrayList<>();
+ int nextIndex = -1;
+
+ for (int i = startIndex; i < objects.size(); i++) {
+ final S3ObjectSummary summary = objects.get(i);
+
+ if (summary.getBucketName().equals(request.getBucketName())
+ && summary.getKey().startsWith(request.getPrefix())) {
+
+ if (summaries.size() == request.getMaxKeys()) {
+ // We reached our max key limit; set nextIndex (which will lead to a result with truncated = true).
+ nextIndex = i;
+ break;
+ }
+
+ // Generate a summary.
+ summaries.add(summary);
+ }
+ }
+
+ // Generate the result.
+ final ListObjectsV2Result retVal = new ListObjectsV2Result();
+ retVal.setContinuationToken(continuationToken);
+ retVal.getObjectSummaries().addAll(summaries);
+
+ if (nextIndex >= 0) {
+ retVal.setTruncated(true);
+ retVal.setNextContinuationToken(String.valueOf(nextIndex));
+ }
+
+ return retVal;
+ }
+ };
+ }
+
+ private static S3ObjectSummary makeObjectSummary(final String bucket, final String key, final long size)
+ {
+ final S3ObjectSummary summary = new S3ObjectSummary();
+ summary.setBucketName(bucket);
+ summary.setKey(key);
+ summary.setSize(size);
+ return summary;
+ }
+}