Support finding segments in AWS S3. (#3399)

* support finding segments from a AWS S3 storage.

* add more Uts

* address comments and add a document for the feature.

* update docs indentation

* update docs indentation

* address comments.
1. add a Ut for json ser/deser for the config object.
2. more informant error message in a Ut.

* address comments.
1. use @Min to validate the configuration object
2. change updateDescriptor to a string as it does not take an argument otherwise

* fix a Ut failure - delete a Ut for testing default max length.
This commit is contained in:
jaehong choi 2016-10-11 09:27:09 +09:00 committed by Fangjin Yang
parent 1e79a1be82
commit 6f21778364
9 changed files with 771 additions and 11 deletions

View File

@ -94,3 +94,27 @@ and `druid-hdfs-storage` in the extension list.
After running this command, the segments table in `mysql` should store the new location for each segment we just inserted. After running this command, the segments table in `mysql` should store the new location for each segment we just inserted.
Note that for segments stored in HDFS, druid config must contain core-site.xml as described in [Druid Docs](http://druid.io/docs/latest/tutorials/cluster.html), as this new location is stored with relative path. Note that for segments stored in HDFS, druid config must contain core-site.xml as described in [Druid Docs](http://druid.io/docs/latest/tutorials/cluster.html), as this new location is stored with relative path.
It is also possible to use `s3` as deep storage. In order to work with it, specify `s3` as deep storage type and load
[`druid-s3-extensions`](../development/extensions-core/s3.html) as an extension.
```
java
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-s3-extensions\"]
-Ddruid.storage.type=s3
-Ddruid.s3.accessKey=...
-Ddruid.s3.secretKey=...
-Ddruid.storage.bucket=your-bucket
-Ddruid.storage.baseKey=druid/storage/wikipedia
-Ddruid.storage.maxListingLength=1000
-cp $DRUID_CLASSPATH
io.druid.cli.Main tools insert-segment-to-db --workingDir "druid/storage/wikipedia" --updateDescriptor true
```
Note that you can provide the location of segments with either `druid.storage.baseKey` or `--workingDir`. If both are
specified, `--workingDir` gets higher priority. `druid.storage.maxListingLength` is to determine the length of a
partial list in requesting a object listing to `s3`, which defaults to 1000.

View File

@ -0,0 +1,122 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class S3DataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(S3DataSegmentFinder.class);
private final RestS3Service s3Client;
private final ObjectMapper jsonMapper;
private final S3DataSegmentPusherConfig config;
@Inject
public S3DataSegmentFinder(
RestS3Service s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException
{
final Set<DataSegment> segments = Sets.newHashSet();
try {
Iterator<StorageObject> objectsIterator = S3Utils.storageObjectsIterator(
s3Client,
config.getBucket(),
workingDirPath.length() == 0 ? config.getBaseKey() : workingDirPath,
config.getMaxListingLength());
while(objectsIterator.hasNext()) {
StorageObject storageObject = objectsIterator.next();
storageObject.closeDataInputStream();
if (S3Utils.toFilename(storageObject.getKey()).equals("descriptor.json")) {
final String descriptorJson = storageObject.getKey();
String indexZip = S3Utils.indexZipForSegmentPath(descriptorJson);
if (S3Utils.isObjectInBucket(s3Client, config.getBucket(), indexZip)) {
S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson);
try (InputStream is = indexObject.getDataInputStream()) {
final DataSegment dataSegment = jsonMapper.readValue(is, DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
if (!loadSpec.get("type").equals(S3StorageDruidModule.SCHEME) || !loadSpec.get("key").equals(indexZip)) {
loadSpec.put("type", S3StorageDruidModule.SCHEME);
loadSpec.put("key", indexZip);
if (updateDescriptor) {
log.info(
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
descriptorJson,
indexObject
);
S3Object newDescJsonObject = new S3Object(descriptorJson, jsonMapper.writeValueAsString(dataSegment));
s3Client.putObject(config.getBucket(), newDescJsonObject);
}
}
segments.add(dataSegment);
}
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descriptor.json exists!?",
indexZip
);
}
}
}
} catch (ServiceException e) {
throw new SegmentLoadingException(e, "Problem interacting with S3");
} catch (IOException e) {
throw new SegmentLoadingException(e, "IO exception");
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
Throwables.propagate(e);
}
return segments;
}
}

View File

@ -313,13 +313,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
} }
} }
private String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
return filename;
}
private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
{ {
try { try {

View File

@ -21,6 +21,8 @@ package io.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
/** /**
*/ */
public class S3DataSegmentPusherConfig public class S3DataSegmentPusherConfig
@ -34,6 +36,10 @@ public class S3DataSegmentPusherConfig
@JsonProperty @JsonProperty
private boolean disableAcl = false; private boolean disableAcl = false;
@JsonProperty
@Min(0)
private int maxListingLength = 1000;
public void setBucket(String bucket) public void setBucket(String bucket)
{ {
this.bucket = bucket; this.bucket = bucket;
@ -49,6 +55,11 @@ public class S3DataSegmentPusherConfig
this.disableAcl = disableAcl; this.disableAcl = disableAcl;
} }
public void setMaxListingLength(int maxListingLength)
{
this.maxListingLength = maxListingLength;
}
public String getBucket() public String getBucket()
{ {
return bucket; return bucket;
@ -63,4 +74,9 @@ public class S3DataSegmentPusherConfig
{ {
return disableAcl; return disableAcl;
} }
public int getMaxListingLength()
{
return maxListingLength;
}
} }

View File

@ -87,6 +87,7 @@ public class S3StorageDruidModule implements DruidModule
Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class); Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class); Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentFinderBinder(binder).addBinding("s3").to(S3DataSegmentFinder.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);

View File

@ -21,14 +21,19 @@ package io.druid.storage.s3;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.metamx.common.RetryUtils; import com.metamx.common.RetryUtils;
import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.jets3t.service.ServiceException; import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object; import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Callable;
/** /**
* *
@ -107,6 +112,79 @@ public class S3Utils
return true; return true;
} }
public static Iterator<StorageObject> storageObjectsIterator(
final RestS3Service s3Client,
final String bucket,
final String prefix,
final long maxListingLength
)
{
return new Iterator<StorageObject>()
{
private StorageObjectsChunk objectsChunk;
private int objectsChunkOffset;
@Override
public boolean hasNext()
{
if (objectsChunk == null) {
objectsChunk = listObjectsChunkedAfter("");
objectsChunkOffset = 0;
}
if (objectsChunk.getObjects().length <= objectsChunkOffset) {
if (objectsChunk.isListingComplete()) {
return false;
} else {
objectsChunk = listObjectsChunkedAfter(objectsChunk.getPriorLastKey());
objectsChunkOffset = 0;
}
}
return true;
}
private StorageObjectsChunk listObjectsChunkedAfter(final String priorLastKey)
{
try {
return retryS3Operation(
new Callable<StorageObjectsChunk>()
{
@Override
public StorageObjectsChunk call() throws Exception
{
return s3Client.listObjectsChunked(
bucket, prefix, null, maxListingLength, priorLastKey);
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public StorageObject next()
{
if (!hasNext()) {
throw new IllegalStateException();
}
StorageObject storageObject = objectsChunk.getObjects()[objectsChunkOffset];
objectsChunkOffset++;
return storageObject;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
public static String constructSegmentPath(String baseKey, DataSegment segment) public static String constructSegmentPath(String baseKey, DataSegment segment)
{ {
@ -120,4 +198,21 @@ public class S3Utils
{ {
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
} }
public static String indexZipForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/index.zip";
}
public static String toFilename(String key)
{
return toFilename(key, "");
}
public static String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
return filename;
}
} }

View File

@ -0,0 +1,436 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class S3DataSegmentFinderTest
{
private static final ObjectMapper mapper = new DefaultObjectMapper();
private static final DataSegment SEGMENT_1 = DataSegment.builder()
.dataSource("wikipedia")
.interval(
new Interval(
"2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"
)
)
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"s3_zip",
"bucket",
"bucket1",
"key",
"abc/somewhere/index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"
)
)
.build();
private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.version("2015-10-22T22:07:57.074Z")
.build();
private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.shardSpec(new NumberedShardSpec(0, 2))
.build();
private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1)
.interval(
new Interval(
"2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"
)
)
.shardSpec(new NumberedShardSpec(1, 2))
.build();
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
RestS3Service mockS3Client;
S3DataSegmentPusherConfig config;
private String bucket;
private String baseKey;
private String descriptor1;
private String descriptor2;
private String descriptor3;
private String descriptor4_0;
private String descriptor4_1;
private String indexZip1;
private String indexZip2;
private String indexZip3;
private String indexZip4_0;
private String indexZip4_1;
@BeforeClass
public static void setUpStatic()
{
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
}
@Before
public void setUp() throws Exception
{
bucket = "bucket1";
baseKey = "dataSource1";
config = new S3DataSegmentPusherConfig();
config.setBucket(bucket);
config.setBaseKey(baseKey);
mockS3Client = new MockStorageService(temporaryFolder.newFolder());
descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval1/v1/0/");
descriptor2 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval2/v1/0/");
descriptor3 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval3/v2/0/");
descriptor4_0 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval4/v1/0/");
descriptor4_1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval4/v1/1/");
indexZip1 = S3Utils.indexZipForSegmentPath(descriptor1);
indexZip2 = S3Utils.indexZipForSegmentPath(descriptor2);
indexZip3 = S3Utils.indexZipForSegmentPath(descriptor3);
indexZip4_0 = S3Utils.indexZipForSegmentPath(descriptor4_0);
indexZip4_1 = S3Utils.indexZipForSegmentPath(descriptor4_1);
mockS3Client.putObject(bucket, new S3Object(descriptor1, mapper.writeValueAsString(SEGMENT_1)));
mockS3Client.putObject(bucket, new S3Object(descriptor2, mapper.writeValueAsString(SEGMENT_2)));
mockS3Client.putObject(bucket, new S3Object(descriptor3, mapper.writeValueAsString(SEGMENT_3)));
mockS3Client.putObject(bucket, new S3Object(descriptor4_0, mapper.writeValueAsString(SEGMENT_4_0)));
mockS3Client.putObject(bucket, new S3Object(descriptor4_1, mapper.writeValueAsString(SEGMENT_4_1)));
mockS3Client.putObject(bucket, new S3Object(indexZip1, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip2, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip3, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip4_0, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip4_1, "dummy"));
}
@Test
public void testFindSegments() throws Exception
{
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments("", false);
Assert.assertEquals(5, segments.size());
DataSegment updatedSegment1 = null;
DataSegment updatedSegment2 = null;
DataSegment updatedSegment3 = null;
DataSegment updatedSegment4_0 = null;
DataSegment updatedSegment4_1 = null;
for (DataSegment dataSegment : segments) {
if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) {
updatedSegment1 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) {
updatedSegment2 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) {
updatedSegment3 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) {
updatedSegment4_0 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) {
updatedSegment4_1 = dataSegment;
} else {
Assert.fail("Unexpected segment identifier : " + dataSegment.getIdentifier());
}
}
Assert.assertEquals(descriptor1, getDescriptorPath(updatedSegment1));
Assert.assertEquals(descriptor2, getDescriptorPath(updatedSegment2));
Assert.assertEquals(descriptor3, getDescriptorPath(updatedSegment3));
Assert.assertEquals(descriptor4_0, getDescriptorPath(updatedSegment4_0));
Assert.assertEquals(descriptor4_1, getDescriptorPath(updatedSegment4_1));
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
Assert.assertNotEquals(serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream()));
Assert.assertNotEquals(serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream()));
Assert.assertNotEquals(serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream()));
Assert.assertNotEquals(serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream()));
Assert.assertNotEquals(serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream()));
final Set<DataSegment> segments2 = s3DataSegmentFinder.findSegments("", true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream()));
Assert.assertEquals(serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream()));
Assert.assertEquals(serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream()));
Assert.assertEquals(serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream()));
Assert.assertEquals(serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream()));
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws SegmentLoadingException, ServiceException
{
mockS3Client.deleteObject(bucket, indexZip4_1);
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
s3DataSegmentFinder.findSegments("", false);
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail2() throws SegmentLoadingException
{
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(
mockS3Client, config, new DefaultObjectMapper());
try {
s3DataSegmentFinder.findSegments("", false);
}
catch (SegmentLoadingException e) {
Assert.assertTrue(e.getCause() instanceof IOException);
throw e;
}
}
@Test
public void testFindSegmentsWithmaxListingLength() throws SegmentLoadingException
{
config.setMaxListingLength(3);
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments("", false);
Assert.assertEquals(5, segments.size());
}
@Test
public void testFindSegmentsWithworkingDirPath() throws SegmentLoadingException
{
config.setBaseKey("");
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments(baseKey, false);
Assert.assertEquals(5, segments.size());
}
private String getDescriptorPath(DataSegment segment)
{
return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key")));
}
private static class MockStorageService extends RestS3Service
{
private final File baseDir;
private final Map<String, Set<String>> storage = Maps.newHashMap();
public MockStorageService(File baseDir)
{
super(null);
this.baseDir = baseDir;
}
@Override
public StorageObjectsChunk listObjectsChunked(
final String bucketName, final String prefix, final String delimiter,
final long maxListingLength, final String priorLastKey
) throws ServiceException
{
List<String> keysOrigin = Lists.newArrayList(storage.get(bucketName));
Predicate<String> prefixFilter = new Predicate<String>()
{
@Override
public boolean apply(@Nullable String input)
{
return input.startsWith(prefix);
}
};
ImmutableList<String> keys = ImmutableList.copyOf(
Ordering.natural().sortedCopy(Iterables.filter(keysOrigin, prefixFilter)
)
);
int startOffset = 0;
if (priorLastKey != null) {
startOffset = keys.indexOf(priorLastKey) + 1;
}
int endOffset = startOffset + (int)maxListingLength; // exclusive
if (endOffset > keys.size()) {
endOffset = keys.size();
}
String newPriorLastkey = keys.get(endOffset - 1);
if (endOffset == (keys.size())) {
newPriorLastkey = null;
}
List<StorageObject> objects = Lists.newArrayList();
for(String objectKey : keys.subList(startOffset, endOffset)) {
objects.add(getObjectDetails(bucketName, objectKey));
}
return new StorageObjectsChunk(
prefix, delimiter, objects.toArray(new StorageObject[]{}), null, newPriorLastkey);
}
@Override
public StorageObject getObjectDetails(String bucketName, String objectKey) throws ServiceException
{
if (!storage.containsKey(bucketName)) {
ServiceException ex = new ServiceException();
ex.setResponseCode(404);
ex.setErrorCode("NoSuchBucket");
throw ex;
}
if (!storage.get(bucketName).contains(objectKey)) {
ServiceException ex = new ServiceException();
ex.setResponseCode(404);
ex.setErrorCode("NoSuchKey");
throw ex;
}
final File objectPath = new File(baseDir, objectKey);
StorageObject storageObject = new StorageObject();
storageObject.setBucketName(bucketName);
storageObject.setKey(objectKey);
storageObject.setDataInputFile(objectPath);
return storageObject;
}
@Override
public S3Object getObject(String bucketName, String objectKey) throws S3ServiceException
{
final File objectPath = new File(baseDir, objectKey);
S3Object s3Object = new S3Object();
s3Object.setBucketName(bucketName);
s3Object.setKey(objectKey);
s3Object.setDataInputFile(objectPath);
return s3Object;
}
@Override
public S3Object putObject(final String bucketName, final S3Object object) throws S3ServiceException
{
if (!storage.containsKey(bucketName)) {
storage.put(bucketName, Sets.<String>newHashSet());
}
storage.get(bucketName).add(object.getKey());
final File objectPath = new File(baseDir, object.getKey());
if (!objectPath.getParentFile().exists()) {
objectPath.getParentFile().mkdirs();
}
try {
try (
InputStream in = object.getDataInputStream()
) {
FileUtils.copyInputStreamToFile(in, objectPath);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return object;
}
@Override
public void deleteObject(String bucketName, String objectKey) throws ServiceException
{
storage.get(bucketName).remove(objectKey);
final File objectPath = new File(baseDir, objectKey);
objectPath.delete();
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.Validator;
import java.io.IOException;
import java.util.Set;
public class S3DataSegmentPusherConfigTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testSerialization() throws IOException
{
String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\","
+"\"disableAcl\":false,\"maxListingLength\":2000}";
S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class);
Assert.assertEquals(jsonConfig, jsonMapper.writeValueAsString(config));
}
@Test
public void testSerializationWithDefaults() throws IOException
{
String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}";
String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\","
+"\"disableAcl\":false,\"maxListingLength\":1000}";
S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class);
Assert.assertEquals(expectedJsonConfig, jsonMapper.writeValueAsString(config));
}
@Test
public void testSerializationValidatingMaxListingLength() throws IOException
{
String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\","
+"\"disableAcl\":false,\"maxListingLength\":-1}";
Validator validator = Validation.buildDefaultValidatorFactory().getValidator();
S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class);
Set<ConstraintViolation<S3DataSegmentPusherConfig>> violations = validator.validate(config);
Assert.assertEquals(1, violations.size());
ConstraintViolation violation = Iterators.getOnlyElement(violations.iterator());
Assert.assertEquals("must be greater than or equal to 0", violation.getMessage());
}
}

View File

@ -58,7 +58,7 @@ public class InsertSegment extends GuiceRunnable
private String workingDirPath; private String workingDirPath;
@Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json if the path in loadSpec is different from where desciptor.json was found. Default value is true", required = false) @Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json if the path in loadSpec is different from where desciptor.json was found. Default value is true", required = false)
private boolean updateDescriptor = true; private String updateDescriptor;
private ObjectMapper mapper; private ObjectMapper mapper;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@ -97,7 +97,7 @@ public class InsertSegment extends GuiceRunnable
Set<DataSegment> segments = null; Set<DataSegment> segments = null;
try { try {
segments = dataSegmentFinder.findSegments(workingDirPath, updateDescriptor); segments = dataSegmentFinder.findSegments(workingDirPath, Boolean.valueOf(updateDescriptor));
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
Throwables.propagate(e); Throwables.propagate(e);