S3 input source (#8903)

* add s3 input source for native batch ingestion

* add docs

* fixes

* checkstyle

* lazy splits

* fixes and hella tests

* fix it

* re-use better iterator

* use key

* javadoc and checkstyle

* exception

* oops

* refactor to use S3Coords instead of URI

* remove unused code, add retrying stream to handle s3 stream

* remove unused parameter

* update to latest master

* use list of objects instead of object

* serde test

* refactor and such

* now with the ability to compile

* fix signature and javadocs

* fix conflicts yet again, fix S3 uri stuffs

* more tests, enforce uri for bucket

* javadoc

* oops

* abstract class instead of interface

* null or empty

* better error
This commit is contained in:
Clint Wylie 2019-11-25 22:31:19 -08:00 committed by GitHub
parent 282b838b3f
commit 4458113375
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1397 additions and 291 deletions

View File

@ -75,9 +75,5 @@ public interface InputSource
* @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true
* @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished.
*/
InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
);
InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory);
}

View File

@ -19,47 +19,58 @@
package org.apache.druid.data.input;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.utils.CompressionUtils;
import java.io.IOException;
import java.io.InputStream;
public interface RetryingInputEntity extends InputEntity
public abstract class RetryingInputEntity implements InputEntity
{
/**
* Open a {@link RetryingInputStream} wrapper for an underlying input stream, optionally decompressing the retrying
* stream if the file extension matches a known compression, otherwise passing through the retrying stream directly.
*/
@Override
default InputStream open() throws IOException
public InputStream open() throws IOException
{
return new RetryingInputStream<>(
RetryingInputStream<?> retryingInputStream = new RetryingInputStream<>(
this,
new RetryingInputEntityOpenFunction(),
getRetryCondition(),
RetryUtils.DEFAULT_MAX_TRIES
);
return CompressionUtils.decompress(retryingInputStream, getPath());
}
/**
* Directly opens an {@link InputStream} on the input entity.
* Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, and is
* handled by the default implementation of {@link #open}, so this should return the raw stream for the object.
*/
default InputStream readFromStart() throws IOException
protected InputStream readFromStart() throws IOException
{
return readFrom(0);
}
/**
* Directly opens an {@link InputStream} starting at the given offset on the input entity.
* Directly opens an {@link InputStream} starting at the given offset on the input entity. Decompression should be
* handled externally, and is handled by the default implementation of {@link #open},this should return the raw stream
* for the object.
*
* @param offset an offset to start reading from. A non-negative integer counting
* the number of bytes from the beginning of the entity
*/
InputStream readFrom(long offset) throws IOException;
protected abstract InputStream readFrom(long offset) throws IOException;
@Override
Predicate<Throwable> getRetryCondition();
/**
* Get path name for this entity, used by the default implementation of {@link #open} to determine if the underlying
* stream needs decompressed, based on file extension of the path
*/
protected abstract String getPath();
class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
private static class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
{
@Override
public InputStream open(RetryingInputEntity object) throws IOException

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import java.net.URI;
import java.util.Objects;
/**
* Common type for 'bucket' and 'path' concept of cloud objects to allow code sharing between cloud specific
* implementations. {@link #bucket} and {@link #path} should NOT be URL encoded.
*
* The intention is that this is used as a common representation for storage objects as an alternative to dealing in
* {@link URI} directly, but still provide a mechanism to round-trip with a URI.
*
* In common clouds, bucket names must be dns compliant:
* https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
* https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata
* https://cloud.google.com/storage/docs/naming
*
* The constructor ensures that bucket names are DNS compliant by checking that the URL encoded form of the bucket
* matches the supplied value. Technically it should probably confirm that the bucket is also all lower-case, but
* S3 has a legacy mode where buckets did not have to be compliant so we can't enforce that here unfortunately.
*/
public class CloudObjectLocation
{
private final String bucket;
private final String path;
@JsonCreator
public CloudObjectLocation(@JsonProperty("bucket") String bucket, @JsonProperty("path") String path)
{
this.bucket = Preconditions.checkNotNull(StringUtils.maybeRemoveTrailingSlash(bucket));
this.path = Preconditions.checkNotNull(StringUtils.maybeRemoveLeadingSlash(path));
Preconditions.checkArgument(
this.bucket.equals(StringUtils.urlEncode(this.bucket)),
"bucket must follow DNS-compliant naming conventions"
);
}
public CloudObjectLocation(URI uri)
{
this(uri.getHost(), uri.getPath());
}
/**
* Given a scheme, encode {@link #bucket} and {@link #path} into a {@link URI}.
*
* In all clouds bucket names must be dns compliant, so it does not require encoding
* There is no such restriction on object names, so they will be URL encoded when constructing the URI
*/
public URI toUri(String scheme)
{
// Encode path, except leave '/' characters unencoded
return URI.create(
StringUtils.format(
"%s://%s/%s",
scheme,
bucket,
StringUtils.replace(StringUtils.urlEncode(path), "%2F", "/")
)
);
}
@JsonProperty
public String getBucket()
{
return bucket;
}
@JsonProperty
public String getPath()
{
return path;
}
@Override
public String toString()
{
return "CloudObjectLocation{" +
"bucket='" + bucket + '\'' +
", path='" + path + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CloudObjectLocation that = (CloudObjectLocation) o;
return Objects.equals(bucket, that.bucket) &&
Objects.equals(path, that.path);
}
@Override
public int hashCode()
{
return Objects.hash(bucket, path);
}
}

View File

@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@ -36,7 +35,7 @@ import java.net.URI;
import java.net.URLConnection;
import java.util.Base64;
public class HttpEntity implements RetryingInputEntity
public class HttpEntity extends RetryingInputEntity
{
private static final Logger LOG = new Logger(HttpEntity.class);
@ -64,12 +63,15 @@ public class HttpEntity implements RetryingInputEntity
}
@Override
public InputStream readFrom(long offset) throws IOException
protected InputStream readFrom(long offset) throws IOException
{
return CompressionUtils.decompress(
openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset),
uri.toString()
);
return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset);
}
@Override
protected String getPath()
{
return uri.getPath();
}
@Override

View File

@ -237,6 +237,16 @@ public class StringUtils
}
}
public static String maybeRemoveLeadingSlash(String s)
{
return s != null && s.startsWith("/") ? s.substring(1) : s;
}
public static String maybeRemoveTrailingSlash(String s)
{
return s != null && s.endsWith("/") ? s.substring(0, s.length() - 1) : s;
}
/**
* Removes all occurrences of the given char from the given string. This method is an optimal version of
* {@link String#replace(CharSequence, CharSequence) s.replace("c", "")}.

View File

@ -23,11 +23,13 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.TreeSet;
@ -116,6 +118,11 @@ public final class CollectionUtils
return result;
}
public static boolean isNullOrEmpty(@Nullable List<?> list)
{
return list == null || list.isEmpty();
}
private CollectionUtils()
{
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.net.URI;
public class CloudObjectLocationTest
{
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String SCHEME = "s3";
private static final String BUCKET_NAME = "bucket";
private static final CloudObjectLocation LOCATION =
new CloudObjectLocation(BUCKET_NAME, "path/to/myobject");
private static final CloudObjectLocation LOCATION_EXTRA_SLASHES =
new CloudObjectLocation(BUCKET_NAME + '/', "/path/to/myobject");
private static final CloudObjectLocation LOCATION_URLENCODE =
new CloudObjectLocation(BUCKET_NAME, "path/to/myobject?question");
private static final CloudObjectLocation LOCATION_NON_ASCII =
new CloudObjectLocation(BUCKET_NAME, "pÄth/tø/myøbject");
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws Exception
{
Assert.assertEquals(
LOCATION,
MAPPER.readValue(MAPPER.writeValueAsString(LOCATION), CloudObjectLocation.class)
);
Assert.assertEquals(
LOCATION_EXTRA_SLASHES,
MAPPER.readValue(MAPPER.writeValueAsString(LOCATION_EXTRA_SLASHES), CloudObjectLocation.class)
);
Assert.assertEquals(
LOCATION_URLENCODE,
MAPPER.readValue(MAPPER.writeValueAsString(LOCATION_URLENCODE), CloudObjectLocation.class)
);
Assert.assertEquals(
LOCATION_NON_ASCII,
MAPPER.readValue(MAPPER.writeValueAsString(LOCATION_NON_ASCII), CloudObjectLocation.class)
);
}
@Test
public void testToUri()
{
Assert.assertEquals(
URI.create("s3://bucket/path/to/myobject"),
LOCATION.toUri(SCHEME)
);
Assert.assertEquals(
URI.create("s3://bucket/path/to/myobject"),
LOCATION_EXTRA_SLASHES.toUri(SCHEME)
);
Assert.assertEquals(
URI.create("s3://bucket/path/to/myobject%3Fquestion"),
LOCATION_URLENCODE.toUri(SCHEME)
);
Assert.assertEquals(
URI.create("s3://bucket/p%C3%84th/t%C3%B8/my%C3%B8bject"),
LOCATION_NON_ASCII.toUri(SCHEME)
);
}
@Test
public void testUriRoundTrip()
{
Assert.assertEquals(LOCATION, new CloudObjectLocation(LOCATION.toUri(SCHEME)));
Assert.assertEquals(LOCATION_EXTRA_SLASHES, new CloudObjectLocation(LOCATION_EXTRA_SLASHES.toUri(SCHEME)));
Assert.assertEquals(LOCATION_URLENCODE, new CloudObjectLocation(LOCATION_URLENCODE.toUri(SCHEME)));
Assert.assertEquals(LOCATION_NON_ASCII, new CloudObjectLocation(LOCATION_NON_ASCII.toUri(SCHEME)));
}
@Test
public void testBucketName()
{
expectedException.expect(IllegalArgumentException.class);
CloudObjectLocation invalidBucket = new CloudObjectLocation("someBÜcket", "some/path");
// will never get here
Assert.assertEquals(invalidBucket, new CloudObjectLocation(invalidBucket.toUri(SCHEME)));
}
}

View File

@ -98,6 +98,82 @@ You can enable [server-side encryption](https://docs.aws.amazon.com/AmazonS3/lat
- kms: [Server-side encryption with AWS KMSManaged Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html)
- custom: [Server-side encryption with Customer-Provided Encryption Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html)
<a name="input-source"></a>
## S3 batch ingestion input source
This extension also provides an input source for Druid native batch ingestion to support reading objects directly from S3. Objects can be specified either via a list of S3 URI strings or a list of S3 location prefixes, which will attempt to list the contents and ingest all objects contained in the locations. The S3 input source is splittable and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task), where each worker task of `index_parallel` will read a single object.
Sample spec:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"prefixes": ["s3://foo/bar", "s3://bar/foo"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `s3`.|N/A|yes|
|uris|JSON array of URIs where S3 objects to be ingested are located.|N/A|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|
S3 Object:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the S3 bucket|N/A|yes|
|path|The path where data is located.|N/A|yes|
<a name="firehose"></a>
## StaticS3Firehose

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.azure.AzureByteSource;
import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.AzureUtils;
@ -50,7 +51,7 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi
@JsonCreator
public StaticAzureBlobStoreFirehoseFactory(
@JacksonInject("azureStorage") AzureStorage azureStorage,
@JacksonInject AzureStorage azureStorage,
@JsonProperty("blobs") List<AzureBlob> blobs,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@ -101,9 +102,7 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi
private static AzureByteSource makeByteSource(AzureStorage azureStorage, AzureBlob object)
{
final String container = object.getContainer();
final String path = object.getPath().startsWith("/")
? object.getPath().substring(1)
: object.getPath();
final String path = StringUtils.maybeRemoveLeadingSlash(object.getPath());
return new AzureByteSource(azureStorage, container, path);
}

View File

@ -50,7 +50,7 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho
@JsonCreator
public StaticCloudFilesFirehoseFactory(
@JacksonInject("objectApi") CloudFilesApi cloudFilesApi,
@JacksonInject CloudFilesApi cloudFilesApi,
@JsonProperty("blobs") List<CloudFilesBlob> blobs,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,

View File

@ -21,17 +21,17 @@ package org.apache.druid.data.input.google;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;
import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
public class GoogleCloudStorageEntity implements RetryingInputEntity
public class GoogleCloudStorageEntity extends RetryingInputEntity
{
private final GoogleStorage storage;
private final URI uri;
@ -50,13 +50,19 @@ public class GoogleCloudStorageEntity implements RetryingInputEntity
}
@Override
public InputStream readFrom(long offset) throws IOException
protected InputStream readFrom(long offset) throws IOException
{
// Get data of the given object and open an input stream
final String bucket = uri.getAuthority();
final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri);
final String key = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key);
return CompressionUtils.decompress(byteSource.openStream(offset), uri.getPath());
return byteSource.openStream(offset);
}
@Override
protected String getPath()
{
return StringUtils.maybeRemoveLeadingSlash(uri.getPath());
}
@Override

View File

@ -27,6 +27,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;
@ -87,9 +88,7 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF
private GoogleByteSource createGoogleByteSource(GoogleBlob object)
{
final String bucket = object.getBucket();
final String path = object.getPath().startsWith("/")
? object.getPath().substring(1)
: object.getPath();
final String path = StringUtils.maybeRemoveLeadingSlash(object.getPath());
return new GoogleByteSource(storage, bucket, path);
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.storage.google;
import com.google.common.base.Predicate;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.URIDataPuller;
@ -81,20 +82,14 @@ public class GoogleDataSegmentPuller implements URIDataPuller
@Override
public InputStream getInputStream(URI uri) throws IOException
{
String path = uri.getPath();
if (path.startsWith("/")) {
path = path.substring(1);
}
String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
return storage.get(uri.getHost(), path);
}
@Override
public String getVersion(URI uri) throws IOException
{
String path = uri.getPath();
if (path.startsWith("/")) {
path = path.substring(1);
}
String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
return storage.version(uri.getHost(), path);
}

View File

@ -23,7 +23,6 @@ import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Predicate;
import java.io.IOException;
import java.net.URI;
public class GoogleUtils
{
@ -36,10 +35,5 @@ public class GoogleUtils
return t instanceof IOException;
}
public static String extractGoogleCloudStorageObjectKey(URI uri)
{
return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath();
}
public static final Predicate<Throwable> GOOGLE_RETRY = e -> isRetryable(e);
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.inputsource.hdfs;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@ -32,7 +31,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
public class HdfsInputEntity implements RetryingInputEntity
public class HdfsInputEntity extends RetryingInputEntity
{
private final Configuration conf;
private final Path path;
@ -50,12 +49,18 @@ public class HdfsInputEntity implements RetryingInputEntity
}
@Override
public InputStream readFrom(long offset) throws IOException
protected InputStream readFrom(long offset) throws IOException
{
final FileSystem fs = path.getFileSystem(conf);
final FSDataInputStream inputStream = fs.open(path);
inputStream.seek(offset);
return CompressionUtils.decompress(inputStream, path.getName());
return inputStream;
}
@Override
protected String getPath()
{
return path.getName();
}
@Override

View File

@ -113,8 +113,19 @@
<artifactId>aws-java-sdk-s3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.s3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
public class S3Entity extends RetryingInputEntity
{
private final ServerSideEncryptingAmazonS3 s3Client;
private final CloudObjectLocation object;
S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords)
{
this.s3Client = s3Client;
this.object = coords;
}
@Override
public URI getUri()
{
return null;
}
@Override
protected InputStream readFrom(long offset) throws IOException
{
final GetObjectRequest request = new GetObjectRequest(object.getBucket(), object.getPath());
request.setRange(offset);
try {
final S3Object s3Object = s3Client.getObject(request);
if (s3Object == null) {
throw new ISE(
"Failed to get an s3 object for bucket[%s], key[%s], and start[%d]",
object.getBucket(),
object.getPath(),
offset
);
}
return s3Object.getObjectContent();
}
catch (AmazonS3Exception e) {
throw new IOException(e);
}
}
@Override
protected String getPath()
{
return object.getPath();
}
@Override
public Predicate<Throwable> getRetryCondition()
{
return S3Utils.S3RETRY;
}
}

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.s3;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class S3InputSource extends AbstractInputSource implements SplittableInputSource<CloudObjectLocation>
{
private static final int MAX_LISTING_LENGTH = 1024;
private final ServerSideEncryptingAmazonS3 s3Client;
private final List<URI> uris;
private final List<URI> prefixes;
private final List<CloudObjectLocation> objects;
@JsonCreator
public S3InputSource(
@JacksonInject ServerSideEncryptingAmazonS3 s3Client,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
)
{
this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(S3Utils::checkURI);
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(S3Utils::checkURI);
} else {
throwIfIllegalArgs(true);
}
}
@JsonProperty
public List<URI> getUris()
{
return uris;
}
@JsonProperty
public List<URI> getPrefixes()
{
return prefixes;
}
@JsonProperty
public List<CloudObjectLocation> getObjects()
{
return objects;
}
@Override
public Stream<InputSplit<CloudObjectLocation>> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
{
if (objects != null) {
return objects.stream().map(InputSplit::new);
}
if (uris != null) {
return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new);
}
return StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false)
.map(S3Utils::summaryToCloudObjectLocation)
.map(InputSplit::new);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
if (objects != null) {
return objects.size();
}
if (uris != null) {
return uris.size();
}
return (int) StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false).count();
}
@Override
public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split)
{
return new S3InputSource(s3Client, null, null, ImmutableList.of(split.get()));
}
@Override
public boolean needsFormat()
{
return true;
}
@Override
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
// formattableReader() is supposed to be called in each task that actually creates segments.
// The task should already have only one split in parallel indexing,
// while there's no need to make splits using splitHintSpec in sequential indexing.
createSplits(inputFormat, null).map(split -> new S3Entity(s3Client, split.get())),
temporaryDirectory
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
S3InputSource that = (S3InputSource) o;
return Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) &&
Objects.equals(objects, that.objects);
}
@Override
public int hashCode()
{
return Objects.hash(uris, prefixes, objects);
}
@Override
public String toString()
{
return "S3InputSource{" +
"uris=" + uris +
", prefixes=" + prefixes +
", objects=" + objects +
'}';
}
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, prefixes.iterator(), MAX_LISTING_LENGTH);
}
private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException
{
if (clause) {
throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified");
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.s3;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import java.util.List;
/**
* Druid module to wire up native batch support for S3 input
*/
public class S3InputSourceDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME))
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -37,7 +37,6 @@ import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFa
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@ -68,7 +67,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
@JsonCreator
public StaticS3FirehoseFactory(
@JacksonInject("s3Client") ServerSideEncryptingAmazonS3 s3Client,
@JacksonInject ServerSideEncryptingAmazonS3 s3Client,
@JsonProperty("uris") List<URI> uris,
@JsonProperty("prefixes") List<URI> prefixes,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@ -115,8 +114,6 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
@Override
protected Collection<URI> initObjects() throws IOException
{
// Here, the returned s3 objects contain minimal information without data.
// Getting data is deferred until openObjectStream() is called for each object.
if (!uris.isEmpty()) {
return uris;
} else {
@ -128,8 +125,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
try {
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
bucket,
prefix,
uri,
MAX_LISTING_LENGTH
);
objects.addAll(Lists.newArrayList(objectSummaryIterator));
@ -164,7 +160,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
}
}
}
return objects.stream().map(StaticS3FirehoseFactory::toUri).collect(Collectors.toList());
return objects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList());
}
}
@ -273,23 +269,4 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
getMaxFetchRetry()
);
}
/**
* Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below.
*
* <pre>
* {@code s3://{BUCKET_NAME}/{OBJECT_KEY}}
* </pre>
*/
private static URI toUri(S3ObjectSummary object)
{
final String originalAuthority = object.getBucketName();
final String originalPath = object.getKey();
final String authority = originalAuthority.endsWith("/") ?
originalAuthority.substring(0, originalAuthority.length() - 1) :
originalAuthority;
final String path = originalPath.startsWith("/") ? originalPath.substring(1) : originalPath;
return URI.create(StringUtils.format("s3://%s/%s", authority, path));
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.base.Strings;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
@ -55,13 +56,9 @@ import java.net.URI;
*/
public class S3DataSegmentPuller implements URIDataPuller
{
public static final int DEFAULT_RETRY_COUNT = 3;
public static final String SCHEME = S3StorageDruidModule.SCHEME;
private static final Logger log = new Logger(S3DataSegmentPuller.class);
protected static final String BUCKET = "bucket";
static final String BUCKET = "bucket";
protected static final String KEY = "key";
protected final ServerSideEncryptingAmazonS3 s3Client;
@ -72,7 +69,7 @@ public class S3DataSegmentPuller implements URIDataPuller
this.s3Client = s3Client;
}
FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) throws SegmentLoadingException
FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, final File outDir) throws SegmentLoadingException
{
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
@ -84,7 +81,7 @@ public class S3DataSegmentPuller implements URIDataPuller
try {
org.apache.commons.io.FileUtils.forceMkdir(outDir);
final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.bucket, s3Coords.path));
final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME);
final ByteSource byteSource = new ByteSource()
{
@Override
@ -103,7 +100,7 @@ public class S3DataSegmentPuller implements URIDataPuller
}
}
};
if (CompressionUtils.isZip(s3Coords.path)) {
if (CompressionUtils.isZip(s3Coords.getPath())) {
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource,
outDir,
@ -113,7 +110,7 @@ public class S3DataSegmentPuller implements URIDataPuller
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath());
return result;
}
if (CompressionUtils.isGz(s3Coords.path)) {
if (CompressionUtils.isGz(s3Coords.getPath())) {
final String fname = Files.getNameWithoutExtension(uri.getPath());
final File outFile = new File(outDir, fname);
@ -139,16 +136,6 @@ public class S3DataSegmentPuller implements URIDataPuller
}
}
public static URI checkURI(URI uri)
{
if (uri.getScheme().equalsIgnoreCase(SCHEME)) {
uri = URI.create("s3" + uri.toString().substring(SCHEME.length()));
} else if (!"s3".equalsIgnoreCase(uri.getScheme())) {
throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString());
}
return uri;
}
@Override
public InputStream getInputStream(URI uri) throws IOException
{
@ -162,8 +149,9 @@ public class S3DataSegmentPuller implements URIDataPuller
private FileObject buildFileObject(final URI uri) throws AmazonServiceException
{
final S3Coords coords = new S3Coords(checkURI(uri));
final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path);
final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri));
final S3ObjectSummary objectSummary =
S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath());
final String path = uri.getPath();
return new FileObject()
@ -289,8 +277,9 @@ public class S3DataSegmentPuller implements URIDataPuller
public String getVersion(URI uri) throws IOException
{
try {
final S3Coords coords = new S3Coords(checkURI(uri));
final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path);
final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri));
final S3ObjectSummary objectSummary =
S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath());
return StringUtils.format("%d", objectSummary.getLastModified().getTime());
}
catch (AmazonServiceException e) {
@ -303,11 +292,11 @@ public class S3DataSegmentPuller implements URIDataPuller
}
}
private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
private boolean isObjectInBucket(final CloudObjectLocation coords) throws SegmentLoadingException
{
try {
return S3Utils.retryS3Operation(
() -> S3Utils.isObjectInBucketIgnoringPermission(s3Client, coords.bucket, coords.path)
() -> S3Utils.isObjectInBucketIgnoringPermission(s3Client, coords.getBucket(), coords.getPath())
);
}
catch (AmazonS3Exception | IOException e) {
@ -317,35 +306,4 @@ public class S3DataSegmentPuller implements URIDataPuller
throw new RuntimeException(e);
}
}
protected static class S3Coords
{
String bucket;
String path;
public S3Coords(URI uri)
{
if (!"s3".equalsIgnoreCase(uri.getScheme())) {
throw new IAE("Unsupported scheme: [%s]", uri.getScheme());
}
bucket = uri.getHost();
String path = uri.getPath();
if (path.startsWith("/")) {
path = path.substring(1);
}
this.path = path;
}
public S3Coords(String bucket, String key)
{
this.bucket = bucket;
this.path = key;
}
@Override
public String toString()
{
return StringUtils.format("s3://%s/%s", bucket, path);
}
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.segment.loading.LoadSpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
@ -32,7 +33,7 @@ import java.io.File;
/**
*
*/
@JsonTypeName(S3StorageDruidModule.SCHEME)
@JsonTypeName(S3StorageDruidModule.SCHEME_S3_ZIP)
public class S3LoadSpec implements LoadSpec
{
private final String bucket;
@ -57,7 +58,7 @@ public class S3LoadSpec implements LoadSpec
@Override
public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size());
return new LoadSpecResult(puller.getSegmentFiles(new CloudObjectLocation(bucket, key), outDir).size());
}
@JsonProperty(S3DataSegmentPuller.BUCKET)

View File

@ -54,7 +54,9 @@ import java.util.List;
*/
public class S3StorageDruidModule implements DruidModule
{
public static final String SCHEME = "s3_zip";
public static final String SCHEME = "s3";
public static final String SCHEME_S3N = "s3n";
public static final String SCHEME_S3_ZIP = "s3_zip";
private static final Logger log = new Logger(S3StorageDruidModule.class);
@ -139,27 +141,27 @@ public class S3StorageDruidModule implements DruidModule
public void configure(Binder binder)
{
MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class)
.addBinding("s3")
.addBinding(SCHEME)
.to(S3TimestampVersionedDataFinder.class)
.in(LazySingleton.class);
MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class)
.addBinding("s3n")
.addBinding(SCHEME_S3N)
.to(S3TimestampVersionedDataFinder.class)
.in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(S3DataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME_S3_ZIP).to(S3DataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME_S3_ZIP).to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder)
.addBinding(SCHEME)
.addBinding(SCHEME_S3_ZIP)
.to(S3DataSegmentArchiver.class)
.in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(S3DataSegmentPusher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3StorageConfig.class);
JsonConfigProvider.bind(binder, "druid.storage.sse.kms", S3SSEKmsConfig.class);
JsonConfigProvider.bind(binder, "druid.storage.sse.custom", S3SSECustomConfig.class);
Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class);
Binders.taskLogsBinder(binder).addBinding(SCHEME).to(S3TaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);
binder.bind(S3TaskLogs.class).in(LazySingleton.class);
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.storage.s3;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.inject.Inject;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
@ -57,39 +57,31 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen
public URI getLatestVersion(final URI uri, final @Nullable Pattern pattern)
{
try {
return RetryUtils.retry(
() -> {
final S3Coords coords = new S3Coords(checkURI(uri));
long mostRecent = Long.MIN_VALUE;
URI latest = null;
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
coords.bucket,
coords.path,
MAX_LISTING_KEYS
);
while (objectSummaryIterator.hasNext()) {
final S3ObjectSummary objectSummary = objectSummaryIterator.next();
String keyString = objectSummary.getKey().substring(coords.path.length());
if (keyString.startsWith("/")) {
keyString = keyString.substring(1);
}
if (pattern != null && !pattern.matcher(keyString).matches()) {
continue;
}
final long latestModified = objectSummary.getLastModified().getTime();
if (latestModified >= mostRecent) {
mostRecent = latestModified;
latest = new URI(
StringUtils.format("s3://%s/%s", objectSummary.getBucketName(), objectSummary.getKey())
);
}
}
return latest;
},
shouldRetryPredicate(),
DEFAULT_RETRY_COUNT
final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri));
long mostRecent = Long.MIN_VALUE;
URI latest = null;
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
uri,
MAX_LISTING_KEYS
);
while (objectSummaryIterator.hasNext()) {
final S3ObjectSummary objectSummary = objectSummaryIterator.next();
final CloudObjectLocation objectLocation = S3Utils.summaryToCloudObjectLocation(objectSummary);
// remove coords path prefix from object path
String keyString = StringUtils.maybeRemoveLeadingSlash(
objectLocation.getPath().substring(coords.getPath().length())
);
if (pattern != null && !pattern.matcher(keyString).matches()) {
continue;
}
final long latestModified = objectSummary.getLastModified().getTime();
if (latestModified >= mostRecent) {
mostRecent = latestModified;
latest = objectLocation.toUri(S3StorageDruidModule.SCHEME);
}
}
return latest;
}
catch (Exception e) {
throw new RuntimeException(e);

View File

@ -32,9 +32,14 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.File;
@ -48,15 +53,18 @@ import java.util.NoSuchElementException;
*/
public class S3Utils
{
private static final String SCHEME = S3StorageDruidModule.SCHEME;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory";
private static final Logger log = new Logger(S3Utils.class);
static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
{
final boolean isIOException = ex.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
return isIOException || isTimeout;
final boolean badStatusCode = ex.getStatusCode() == 400 || ex.getStatusCode() == 403 || ex.getStatusCode() == 404;
return !badStatusCode && (isIOException || isTimeout);
}
public static final Predicate<Throwable> S3RETRY = new Predicate<Throwable>()
@ -80,7 +88,7 @@ public class S3Utils
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
public static <T> T retryS3Operation(Task<T> f) throws Exception
static <T> T retryS3Operation(Task<T> f) throws Exception
{
return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES);
}
@ -106,36 +114,105 @@ public class S3Utils
public static Iterator<S3ObjectSummary> objectSummaryIterator(
final ServerSideEncryptingAmazonS3 s3Client,
final String bucket,
final String prefix,
final URI prefix,
final int numMaxKeys
)
{
final ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(prefix)
.withMaxKeys(numMaxKeys);
return lazyFetchingObjectSummariesIterator(s3Client, Iterators.singletonIterator(prefix), numMaxKeys);
}
/**
* Create an iterator over a set of s3 objects specified by a set of 'prefixes' which may be paths or individual
* objects, in order to get {@link S3ObjectSummary} for each discovered object. This iterator is computed lazily as it
* is iterated, calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for each prefix in batches of
* {@param maxListLength}, falling back to {@link ServerSideEncryptingAmazonS3#getObjectMetadata} if the list API
* returns a 403 status code as a fallback to check if the URI is a single object instead of a directory. These
* summaries are supplied to the outer iterator until drained, then if additional results for the current prefix are
* still available, it will continue fetching and repeat the process, else it will move on to the next prefix,
* continuing until all objects have been evaluated.
*/
public static Iterator<S3ObjectSummary> lazyFetchingObjectSummariesIterator(
final ServerSideEncryptingAmazonS3 s3Client,
final Iterator<URI> uris,
final int maxListingLength
)
{
return new Iterator<S3ObjectSummary>()
{
private ListObjectsV2Request request;
private ListObjectsV2Result result;
private URI currentUri;
private String currentBucket;
private String currentPrefix;
private Iterator<S3ObjectSummary> objectSummaryIterator;
{
prepareNextRequest();
fetchNextBatch();
}
private void prepareNextRequest()
{
currentUri = uris.next();
currentBucket = currentUri.getAuthority();
currentPrefix = S3Utils.extractS3Key(currentUri);
request = new ListObjectsV2Request()
.withBucketName(currentBucket)
.withPrefix(currentPrefix)
.withMaxKeys(maxListingLength);
}
private void fetchNextBatch()
{
result = s3Client.listObjectsV2(request);
objectSummaryIterator = result.getObjectSummaries().iterator();
request.setContinuationToken(result.getContinuationToken());
try {
result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request));
objectSummaryIterator = result.getObjectSummaries().iterator();
request.setContinuationToken(result.getContinuationToken());
}
catch (AmazonS3Exception outerException) {
log.error(outerException, "Exception while listing on %s", currentUri);
if (outerException.getStatusCode() == 403) {
// The "Access Denied" means users might not have a proper permission for listing on the given uri.
// Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes.
// In this case, users should be able to get objects if they have a proper permission for GetObject.
log.warn("Access denied for %s. Try to get the object from the uri without listing", currentUri);
try {
final ObjectMetadata objectMetadata =
S3Utils.retryS3Operation(() -> s3Client.getObjectMetadata(currentBucket, currentPrefix));
if (!S3Utils.isDirectoryPlaceholder(currentPrefix, objectMetadata)) {
// it's not a directory, so just generate an object summary
S3ObjectSummary fabricated = new S3ObjectSummary();
fabricated.setBucketName(currentBucket);
fabricated.setKey(currentPrefix);
objectSummaryIterator = Iterators.singletonIterator(fabricated);
} else {
throw new RE(
"[%s] is a directory placeholder, "
+ "but failed to get the object list under the directory due to permission",
currentUri
);
}
}
catch (Exception innerException) {
throw new RuntimeException(innerException);
}
} else {
throw new RuntimeException(outerException);
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean hasNext()
{
return objectSummaryIterator.hasNext() || result.isTruncated();
return objectSummaryIterator.hasNext() || result.isTruncated() || uris.hasNext();
}
@Override
@ -151,13 +228,16 @@ public class S3Utils
if (result.isTruncated()) {
fetchNextBatch();
} else if (uris.hasNext()) {
prepareNextRequest();
fetchNextBatch();
}
if (!objectSummaryIterator.hasNext()) {
throw new ISE(
"Failed to further iterate on bucket[%s] and prefix[%s]. The last continuationToken was [%s]",
bucket,
prefix,
currentBucket,
currentPrefix,
result.getContinuationToken()
);
}
@ -167,7 +247,25 @@ public class S3Utils
};
}
public static String constructSegmentPath(String baseKey, String storageDir)
/**
* Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below.
*
* <pre>
* {@code s3://{BUCKET_NAME}/{OBJECT_KEY}}
* </pre>
*/
public static URI summaryToUri(S3ObjectSummary object)
{
return summaryToCloudObjectLocation(object).toUri(SCHEME);
}
public static CloudObjectLocation summaryToCloudObjectLocation(S3ObjectSummary object)
{
return new CloudObjectLocation(object.getBucketName(), object.getKey());
}
static String constructSegmentPath(String baseKey, String storageDir)
{
return JOINER.join(
baseKey.isEmpty() ? null : baseKey,
@ -184,7 +282,17 @@ public class S3Utils
public static String extractS3Key(URI uri)
{
return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath();
return StringUtils.maybeRemoveLeadingSlash(uri.getPath());
}
public static URI checkURI(URI uri)
{
if (uri.getScheme().equalsIgnoreCase(S3StorageDruidModule.SCHEME_S3_ZIP)) {
uri = URI.create(SCHEME + uri.toString().substring(S3StorageDruidModule.SCHEME_S3_ZIP.length()));
} else if (!SCHEME.equalsIgnoreCase(uri.getScheme())) {
throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), SCHEME);
}
return uri;
}
// Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder()
@ -254,7 +362,13 @@ public class S3Utils
* @param key The key under which to store the new object.
* @param file The path of the file to upload to Amazon S3.
*/
public static void uploadFileIfPossible(ServerSideEncryptingAmazonS3 service, boolean disableAcl, String bucket, String key, File file)
static void uploadFileIfPossible(
ServerSideEncryptingAmazonS3 service,
boolean disableAcl,
String bucket,
String key,
File file
)
{
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, file);

View File

@ -15,3 +15,4 @@
org.apache.druid.storage.s3.S3StorageDruidModule
org.apache.druid.firehose.s3.S3FirehoseDruidModule
org.apache.druid.data.input.s3.S3InputSourceDruidModule

View File

@ -0,0 +1,449 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class S3InputSourceTest extends InitializedNullHandlingTest
{
private static final ObjectMapper MAPPER = createS3ObjectMapper();
private static final AmazonS3Client S3_CLIENT = EasyMock.createNiceMock(AmazonS3Client.class);
private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
S3_CLIENT,
new NoopServerSideEncryption()
);
private static final List<URI> EXPECTED_URIS = Arrays.asList(
URI.create("s3://foo/bar/file.csv"),
URI.create("s3://bar/foo/file2.csv")
);
private static final List<URI> EXPECTED_COMPRESSED_URIS = Arrays.asList(
URI.create("s3://foo/bar/file.csv.gz"),
URI.create("s3://bar/foo/file2.csv.gz")
);
private static final List<CloudObjectLocation> EXPECTED_COORDS =
EXPECTED_URIS.stream().map(CloudObjectLocation::new).collect(Collectors.toList());
private static final List<URI> PREFIXES = Arrays.asList(
URI.create("s3://foo/bar"),
URI.create("s3://bar/foo")
);
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
private static final DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerdeWithUris() throws Exception
{
final S3InputSource withUris = new S3InputSource(SERVICE, EXPECTED_URIS, null, null);
final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
}
@Test
public void testSerdeWithPrefixes() throws Exception
{
final S3InputSource withPrefixes = new S3InputSource(SERVICE, null, PREFIXES, null);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
}
@Test
public void testSerdeWithObjects() throws Exception
{
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
null,
null,
EXPECTED_LOCATION
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
}
@Test
public void testSerdeWithExtraEmptyLists() throws Exception
{
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
ImmutableList.of(),
ImmutableList.of(),
EXPECTED_LOCATION
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
}
@Test
public void testSerdeWithInvalidArgs() throws Exception
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
EXPECTED_URIS,
PREFIXES,
EXPECTED_LOCATION
);
}
@Test
public void testSerdeWithOtherInvalidArgs()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
EXPECTED_URIS,
PREFIXES,
ImmutableList.of()
);
}
@Test
public void testSerdeWithOtherOtherInvalidArgs()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
ImmutableList.of(),
PREFIXES,
EXPECTED_LOCATION
);
}
@Test
public void testWithUrisSplit()
{
S3InputSource inputSource = new S3InputSource(SERVICE, EXPECTED_URIS, null, null);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithPrefixesSplit()
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(SERVICE, null, PREFIXES, null);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithPrefixesWhereOneIsUrisAndNoListPermissionSplit()
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedNonPrefixObjectsWithNoListPermission();
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null
);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testReader() throws IOException
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedNonPrefixObjectsWithNoListPermission();
addExpectedGetObjectMock(EXPECTED_URIS.get(0));
addExpectedGetObjectMock(EXPECTED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null
);
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
);
InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
temporaryFolder.newFolder()
);
CloseableIterator<InputRow> iterator = reader.read();
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
Assert.assertEquals(NOW, nextRow.getTimestamp());
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
}
@Test
public void testCompressedReader() throws IOException
{
EasyMock.reset(S3_CLIENT);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
addExpectedNonPrefixObjectsWithNoListPermission();
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0));
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1));
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null
);
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
);
InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
temporaryFolder.newFolder()
);
CloseableIterator<InputRow> iterator = reader.read();
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
Assert.assertEquals(NOW, nextRow.getTimestamp());
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
}
private static void addExpectedPrefixObjects(URI prefix, List<URI> uris)
{
final String s3Bucket = prefix.getAuthority();
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setBucketName(s3Bucket);
result.setKeyCount(1);
for (URI uri : uris) {
final String key = S3Utils.extractS3Key(uri);
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(s3Bucket);
objectSummary.setKey(key);
result.getObjectSummaries().add(objectSummary);
}
EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andReturn(result).once();
}
private static void addExpectedNonPrefixObjectsWithNoListPermission()
{
AmazonS3Exception boom = new AmazonS3Exception("oh dang, you can't list that bucket friend");
boom.setStatusCode(403);
EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andThrow(boom).once();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(CONTENT.length);
metadata.setContentEncoding("text/csv");
metadata.setHeader(Headers.ETAG, "some-totally-real-etag-base64-hash-i-guess");
EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject(GetObjectMetadataRequest.class)))
.andReturn(metadata)
.once();
}
private static void addExpectedGetObjectMock(URI uri)
{
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
S3Object someObject = new S3Object();
someObject.setBucketName(s3Bucket);
someObject.setKey(key);
someObject.setObjectContent(new ByteArrayInputStream(CONTENT));
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
}
private static void addExpectedGetCompressedObjectMock(URI uri) throws IOException
{
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
S3Object someObject = new S3Object();
someObject.setBucketName(s3Bucket);
someObject.setKey(key);
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped);
someObject.setObjectContent(new ByteArrayInputStream(gzipped.toByteArray()));
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
}
public static ObjectMapper createS3ObjectMapper()
{
DruidModule baseModule = new TestS3Module();
final Injector injector = Guice.createInjector(
new ObjectMapperModule(),
baseModule
);
final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class);
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
return baseMapper;
}
public static class TestS3Module implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
// Deserializer is need for AmazonS3Client even though it is injected.
// See https://github.com/FasterXML/jackson-databind/issues/962.
return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer()));
}
@Override
public void configure(Binder binder)
{
}
@Provides
public ServerSideEncryptingAmazonS3 getAmazonS3Client()
{
return SERVICE;
}
}
public static class ItemDeserializer extends StdDeserializer<AmazonS3>
{
ItemDeserializer()
{
this(null);
}
ItemDeserializer(Class<?> vc)
{
super(vc);
}
@Override
public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt)
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -19,28 +19,12 @@
package org.apache.druid.firehose.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.data.input.s3.S3InputSourceTest;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.easymock.EasyMock;
import org.junit.Assert;
@ -66,7 +50,7 @@ public class StaticS3FirehoseFactoryTest
@Test
public void testSerde() throws Exception
{
final ObjectMapper mapper = createObjectMapper(new TestS3Module());
final ObjectMapper mapper = S3InputSourceTest.createS3ObjectMapper();
final List<URI> uris = Arrays.asList(
new URI("s3://foo/bar/file.gz"),
@ -101,9 +85,6 @@ public class StaticS3FirehoseFactoryTest
);
uris.sort(Comparator.comparing(URI::toString));
uris.forEach(StaticS3FirehoseFactoryTest::addExpectedObjject);
EasyMock.replay(S3_CLIENT);
final StaticS3FirehoseFactory factory = new StaticS3FirehoseFactory(
SERVICE,
uris,
@ -131,72 +112,4 @@ public class StaticS3FirehoseFactoryTest
Assert.assertEquals(uris.get(i), subFactoryUris.get(0));
}
}
private static void addExpectedObjject(URI uri)
{
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(s3Bucket);
objectSummary.setKey(key);
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setBucketName(s3Bucket);
result.setKeyCount(1);
result.getObjectSummaries().add(objectSummary);
EasyMock.expect(SERVICE.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andReturn(result);
}
private static ObjectMapper createObjectMapper(DruidModule baseModule)
{
final Injector injector = Guice.createInjector(
new ObjectMapperModule(),
baseModule
);
final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class);
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
return baseMapper;
}
private static class TestS3Module implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
// Deserializer is need for AmazonS3Client even though it is injected.
// See https://github.com/FasterXML/jackson-databind/issues/962.
return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer()));
}
@Override
public void configure(Binder binder)
{
}
@Provides
public ServerSideEncryptingAmazonS3 getAmazonS3Client()
{
return SERVICE;
}
}
public static class ItemDeserializer extends StdDeserializer<AmazonS3>
{
public ItemDeserializer()
{
this(null);
}
public ItemDeserializer(Class<?> vc)
{
super(vc);
}
@Override
public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt)
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -85,7 +85,7 @@ public class S3DataSegmentArchiverTest
.version("version")
.loadSpec(ImmutableMap.of(
"type",
S3StorageDruidModule.SCHEME,
S3StorageDruidModule.SCHEME_S3_ZIP,
S3DataSegmentPuller.BUCKET,
"source_bucket",
S3DataSegmentPuller.KEY,
@ -107,7 +107,7 @@ public class S3DataSegmentArchiverTest
final DataSegment archivedSegment = SOURCE_SEGMENT
.withLoadSpec(ImmutableMap.of(
"type",
S3StorageDruidModule.SCHEME,
S3StorageDruidModule.SCHEME_S3_ZIP,
S3DataSegmentPuller.BUCKET,
ARCHIVER_CONFIG.getArchiveBucket(),
S3DataSegmentPuller.KEY,
@ -144,7 +144,7 @@ public class S3DataSegmentArchiverTest
final DataSegment archivedSegment = SOURCE_SEGMENT
.withLoadSpec(ImmutableMap.of(
"type",
S3StorageDruidModule.SCHEME,
S3StorageDruidModule.SCHEME_S3_ZIP,
S3DataSegmentPuller.BUCKET,
ARCHIVER_CONFIG.getArchiveBucket(),
S3DataSegmentPuller.KEY,

View File

@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.SegmentLoadingException;
@ -124,7 +125,7 @@ public class S3DataSegmentPullerTest
EasyMock.replay(s3Client);
FileUtils.FileCopyResult result = puller.getSegmentFiles(
new S3DataSegmentPuller.S3Coords(
new CloudObjectLocation(
bucket,
object0.getKey()
), tmpDir
@ -191,7 +192,7 @@ public class S3DataSegmentPullerTest
EasyMock.replay(s3Client);
FileUtils.FileCopyResult result = puller.getSegmentFiles(
new S3DataSegmentPuller.S3Coords(
new CloudObjectLocation(
bucket,
object0.getKey()
), tmpDir