diff --git a/docs/content/operations/insert-segment-to-db.md b/docs/content/operations/insert-segment-to-db.md index 83b2139e651..2ae9465c5e8 100644 --- a/docs/content/operations/insert-segment-to-db.md +++ b/docs/content/operations/insert-segment-to-db.md @@ -94,3 +94,27 @@ and `druid-hdfs-storage` in the extension list. After running this command, the segments table in `mysql` should store the new location for each segment we just inserted. Note that for segments stored in HDFS, druid config must contain core-site.xml as described in [Druid Docs](http://druid.io/docs/latest/tutorials/cluster.html), as this new location is stored with relative path. + +It is also possible to use `s3` as deep storage. In order to work with it, specify `s3` as deep storage type and load +[`druid-s3-extensions`](../development/extensions-core/s3.html) as an extension. + +``` +java +-Ddruid.metadata.storage.type=mysql +-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid +-Ddruid.metadata.storage.connector.user=druid +-Ddruid.metadata.storage.connector.password=diurd +-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-s3-extensions\"] +-Ddruid.storage.type=s3 +-Ddruid.s3.accessKey=... +-Ddruid.s3.secretKey=... +-Ddruid.storage.bucket=your-bucket +-Ddruid.storage.baseKey=druid/storage/wikipedia +-Ddruid.storage.maxListingLength=1000 +-cp $DRUID_CLASSPATH +io.druid.cli.Main tools insert-segment-to-db --workingDir "druid/storage/wikipedia" --updateDescriptor true +``` + + Note that you can provide the location of segments with either `druid.storage.baseKey` or `--workingDir`. If both are + specified, `--workingDir` gets higher priority. `druid.storage.maxListingLength` is to determine the length of a + partial list in requesting a object listing to `s3`, which defaults to 1000. \ No newline at end of file diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java new file mode 100644 index 00000000000..302574f0455 --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java @@ -0,0 +1,122 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.storage.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentFinder; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +public class S3DataSegmentFinder implements DataSegmentFinder +{ + private static final Logger log = new Logger(S3DataSegmentFinder.class); + + private final RestS3Service s3Client; + private final ObjectMapper jsonMapper; + private final S3DataSegmentPusherConfig config; + + @Inject + public S3DataSegmentFinder( + RestS3Service s3Client, + S3DataSegmentPusherConfig config, + ObjectMapper jsonMapper + ) + { + this.s3Client = s3Client; + this.config = config; + this.jsonMapper = jsonMapper; + } + + @Override + public Set findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException + { + final Set segments = Sets.newHashSet(); + + try { + Iterator objectsIterator = S3Utils.storageObjectsIterator( + s3Client, + config.getBucket(), + workingDirPath.length() == 0 ? config.getBaseKey() : workingDirPath, + config.getMaxListingLength()); + + while(objectsIterator.hasNext()) { + StorageObject storageObject = objectsIterator.next(); + storageObject.closeDataInputStream(); + + if (S3Utils.toFilename(storageObject.getKey()).equals("descriptor.json")) { + final String descriptorJson = storageObject.getKey(); + String indexZip = S3Utils.indexZipForSegmentPath(descriptorJson); + + if (S3Utils.isObjectInBucket(s3Client, config.getBucket(), indexZip)) { + S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson); + + try (InputStream is = indexObject.getDataInputStream()) { + final DataSegment dataSegment = jsonMapper.readValue(is, DataSegment.class); + log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip); + + final Map loadSpec = dataSegment.getLoadSpec(); + if (!loadSpec.get("type").equals(S3StorageDruidModule.SCHEME) || !loadSpec.get("key").equals(indexZip)) { + loadSpec.put("type", S3StorageDruidModule.SCHEME); + loadSpec.put("key", indexZip); + if (updateDescriptor) { + log.info( + "Updating loadSpec in descriptor.json at [%s] with new path [%s]", + descriptorJson, + indexObject + ); + S3Object newDescJsonObject = new S3Object(descriptorJson, jsonMapper.writeValueAsString(dataSegment)); + s3Client.putObject(config.getBucket(), newDescJsonObject); + } + } + segments.add(dataSegment); + } + } else { + throw new SegmentLoadingException( + "index.zip didn't exist at [%s] while descriptor.json exists!?", + indexZip + ); + } + } + } + } catch (ServiceException e) { + throw new SegmentLoadingException(e, "Problem interacting with S3"); + } catch (IOException e) { + throw new SegmentLoadingException(e, "IO exception"); + } catch (Exception e) { + Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class); + Throwables.propagate(e); + } + return segments; + } +} diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index 4a424ae9f18..772b646e35b 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -313,13 +313,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller } } - private String toFilename(String key, final String suffix) - { - String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' - filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end - return filename; - } - private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException { try { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java index fc4f384ee2b..7937ea339d1 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java @@ -21,6 +21,8 @@ package io.druid.storage.s3; import com.fasterxml.jackson.annotation.JsonProperty; +import javax.validation.constraints.Min; + /** */ public class S3DataSegmentPusherConfig @@ -34,6 +36,10 @@ public class S3DataSegmentPusherConfig @JsonProperty private boolean disableAcl = false; + @JsonProperty + @Min(0) + private int maxListingLength = 1000; + public void setBucket(String bucket) { this.bucket = bucket; @@ -49,6 +55,11 @@ public class S3DataSegmentPusherConfig this.disableAcl = disableAcl; } + public void setMaxListingLength(int maxListingLength) + { + this.maxListingLength = maxListingLength; + } + public String getBucket() { return bucket; @@ -63,4 +74,9 @@ public class S3DataSegmentPusherConfig { return disableAcl; } + + public int getMaxListingLength() + { + return maxListingLength; + } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index 7da7753b44a..2d97f6ee2b9 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -87,6 +87,7 @@ public class S3StorageDruidModule implements DruidModule Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class); Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); + Binders.dataSegmentFinderBinder(binder).addBinding("s3").to(S3DataSegmentFinder.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 2c0f22a071b..0772b08d098 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -21,14 +21,19 @@ package io.druid.storage.s3; import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.google.common.base.Throwables; import com.metamx.common.RetryUtils; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; -import java.io.IOException; -import java.util.concurrent.Callable; import org.jets3t.service.ServiceException; +import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.Callable; /** * @@ -107,6 +112,79 @@ public class S3Utils return true; } + public static Iterator storageObjectsIterator( + final RestS3Service s3Client, + final String bucket, + final String prefix, + final long maxListingLength + ) + { + return new Iterator() + { + private StorageObjectsChunk objectsChunk; + private int objectsChunkOffset; + + @Override + public boolean hasNext() + { + if (objectsChunk == null) { + objectsChunk = listObjectsChunkedAfter(""); + objectsChunkOffset = 0; + } + + if (objectsChunk.getObjects().length <= objectsChunkOffset) { + if (objectsChunk.isListingComplete()) { + return false; + } else { + objectsChunk = listObjectsChunkedAfter(objectsChunk.getPriorLastKey()); + objectsChunkOffset = 0; + } + } + + return true; + } + + private StorageObjectsChunk listObjectsChunkedAfter(final String priorLastKey) + { + try { + return retryS3Operation( + new Callable() + { + @Override + public StorageObjectsChunk call() throws Exception + { + return s3Client.listObjectsChunked( + bucket, prefix, null, maxListingLength, priorLastKey); + } + } + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public StorageObject next() + { + if (!hasNext()) { + throw new IllegalStateException(); + } + StorageObject storageObject = objectsChunk.getObjects()[objectsChunkOffset]; + objectsChunkOffset++; + + return storageObject; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + + }; + } public static String constructSegmentPath(String baseKey, DataSegment segment) { @@ -120,4 +198,21 @@ public class S3Utils { return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; } + + public static String indexZipForSegmentPath(String s3Path) + { + return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/index.zip"; + } + + public static String toFilename(String key) + { + return toFilename(key, ""); + } + + public static String toFilename(String key, final String suffix) + { + String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' + filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end + return filename; + } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java new file mode 100644 index 00000000000..aa236b2e403 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java @@ -0,0 +1,436 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.storage.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NumberedShardSpec; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import org.jets3t.service.StorageObjectsChunk; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class S3DataSegmentFinderTest +{ + private static final ObjectMapper mapper = new DefaultObjectMapper(); + + private static final DataSegment SEGMENT_1 = DataSegment.builder() + .dataSource("wikipedia") + .interval( + new Interval( + "2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z" + ) + ) + .version("2015-10-21T22:07:57.074Z") + .loadSpec( + ImmutableMap.of( + "type", + "s3_zip", + "bucket", + "bucket1", + "key", + "abc/somewhere/index.zip" + ) + ) + .dimensions(ImmutableList.of("language", "page")) + .metrics(ImmutableList.of("count")) + .build(); + + private static final DataSegment SEGMENT_2 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z" + ) + ) + .build(); + + private static final DataSegment SEGMENT_3 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .version("2015-10-22T22:07:57.074Z") + .build(); + + private static final DataSegment SEGMENT_4_0 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .shardSpec(new NumberedShardSpec(0, 2)) + .build(); + + private static final DataSegment SEGMENT_4_1 = DataSegment.builder(SEGMENT_1) + .interval( + new Interval( + "2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z" + ) + ) + .shardSpec(new NumberedShardSpec(1, 2)) + .build(); + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + RestS3Service mockS3Client; + S3DataSegmentPusherConfig config; + + private String bucket; + private String baseKey; + + private String descriptor1; + private String descriptor2; + private String descriptor3; + private String descriptor4_0; + private String descriptor4_1; + private String indexZip1; + private String indexZip2; + private String indexZip3; + private String indexZip4_0; + private String indexZip4_1; + + + + @BeforeClass + public static void setUpStatic() + { + mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered")); + } + + @Before + public void setUp() throws Exception + { + bucket = "bucket1"; + baseKey = "dataSource1"; + + config = new S3DataSegmentPusherConfig(); + config.setBucket(bucket); + config.setBaseKey(baseKey); + + mockS3Client = new MockStorageService(temporaryFolder.newFolder()); + + + descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval1/v1/0/"); + descriptor2 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval2/v1/0/"); + descriptor3 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval3/v2/0/"); + descriptor4_0 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval4/v1/0/"); + descriptor4_1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval4/v1/1/"); + indexZip1 = S3Utils.indexZipForSegmentPath(descriptor1); + indexZip2 = S3Utils.indexZipForSegmentPath(descriptor2); + indexZip3 = S3Utils.indexZipForSegmentPath(descriptor3); + indexZip4_0 = S3Utils.indexZipForSegmentPath(descriptor4_0); + indexZip4_1 = S3Utils.indexZipForSegmentPath(descriptor4_1); + + mockS3Client.putObject(bucket, new S3Object(descriptor1, mapper.writeValueAsString(SEGMENT_1))); + mockS3Client.putObject(bucket, new S3Object(descriptor2, mapper.writeValueAsString(SEGMENT_2))); + mockS3Client.putObject(bucket, new S3Object(descriptor3, mapper.writeValueAsString(SEGMENT_3))); + mockS3Client.putObject(bucket, new S3Object(descriptor4_0, mapper.writeValueAsString(SEGMENT_4_0))); + mockS3Client.putObject(bucket, new S3Object(descriptor4_1, mapper.writeValueAsString(SEGMENT_4_1))); + + mockS3Client.putObject(bucket, new S3Object(indexZip1, "dummy")); + mockS3Client.putObject(bucket, new S3Object(indexZip2, "dummy")); + mockS3Client.putObject(bucket, new S3Object(indexZip3, "dummy")); + mockS3Client.putObject(bucket, new S3Object(indexZip4_0, "dummy")); + mockS3Client.putObject(bucket, new S3Object(indexZip4_1, "dummy")); + } + + @Test + public void testFindSegments() throws Exception + { + final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper); + final Set segments = s3DataSegmentFinder.findSegments("", false); + + Assert.assertEquals(5, segments.size()); + + DataSegment updatedSegment1 = null; + DataSegment updatedSegment2 = null; + DataSegment updatedSegment3 = null; + DataSegment updatedSegment4_0 = null; + DataSegment updatedSegment4_1 = null; + + for (DataSegment dataSegment : segments) { + if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) { + updatedSegment1 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_2.getIdentifier())) { + updatedSegment2 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_3.getIdentifier())) { + updatedSegment3 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_4_0.getIdentifier())) { + updatedSegment4_0 = dataSegment; + } else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) { + updatedSegment4_1 = dataSegment; + } else { + Assert.fail("Unexpected segment identifier : " + dataSegment.getIdentifier()); + } + } + + Assert.assertEquals(descriptor1, getDescriptorPath(updatedSegment1)); + Assert.assertEquals(descriptor2, getDescriptorPath(updatedSegment2)); + Assert.assertEquals(descriptor3, getDescriptorPath(updatedSegment3)); + Assert.assertEquals(descriptor4_0, getDescriptorPath(updatedSegment4_0)); + Assert.assertEquals(descriptor4_1, getDescriptorPath(updatedSegment4_1)); + + final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1); + final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2); + final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3); + final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0); + final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1); + + Assert.assertNotEquals(serializedSegment1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream())); + Assert.assertNotEquals(serializedSegment2, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream())); + Assert.assertNotEquals(serializedSegment3, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream())); + Assert.assertNotEquals(serializedSegment4_0, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream())); + Assert.assertNotEquals(serializedSegment4_1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream())); + + final Set segments2 = s3DataSegmentFinder.findSegments("", true); + + Assert.assertEquals(segments, segments2); + + Assert.assertEquals(serializedSegment1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream())); + Assert.assertEquals(serializedSegment2, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream())); + Assert.assertEquals(serializedSegment3, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream())); + Assert.assertEquals(serializedSegment4_0, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream())); + Assert.assertEquals(serializedSegment4_1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream())); + } + + @Test(expected = SegmentLoadingException.class) + public void testFindSegmentsFail() throws SegmentLoadingException, ServiceException + { + mockS3Client.deleteObject(bucket, indexZip4_1); + + final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper); + s3DataSegmentFinder.findSegments("", false); + } + + @Test(expected = SegmentLoadingException.class) + public void testFindSegmentsFail2() throws SegmentLoadingException + { + final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder( + mockS3Client, config, new DefaultObjectMapper()); + + try { + s3DataSegmentFinder.findSegments("", false); + } + catch (SegmentLoadingException e) { + Assert.assertTrue(e.getCause() instanceof IOException); + throw e; + } + } + + @Test + public void testFindSegmentsWithmaxListingLength() throws SegmentLoadingException + { + config.setMaxListingLength(3); + final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper); + final Set segments = s3DataSegmentFinder.findSegments("", false); + Assert.assertEquals(5, segments.size()); + } + + @Test + public void testFindSegmentsWithworkingDirPath() throws SegmentLoadingException + { + config.setBaseKey(""); + final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper); + final Set segments = s3DataSegmentFinder.findSegments(baseKey, false); + Assert.assertEquals(5, segments.size()); + } + + private String getDescriptorPath(DataSegment segment) + { + return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key"))); + } + + private static class MockStorageService extends RestS3Service + { + private final File baseDir; + private final Map> storage = Maps.newHashMap(); + + public MockStorageService(File baseDir) + { + super(null); + this.baseDir = baseDir; + } + + @Override + public StorageObjectsChunk listObjectsChunked( + final String bucketName, final String prefix, final String delimiter, + final long maxListingLength, final String priorLastKey + ) throws ServiceException + { + List keysOrigin = Lists.newArrayList(storage.get(bucketName)); + + Predicate prefixFilter = new Predicate() + { + @Override + public boolean apply(@Nullable String input) + { + return input.startsWith(prefix); + } + }; + + ImmutableList keys = ImmutableList.copyOf( + Ordering.natural().sortedCopy(Iterables.filter(keysOrigin, prefixFilter) + ) + ); + + int startOffset = 0; + if (priorLastKey != null) { + startOffset = keys.indexOf(priorLastKey) + 1; + } + + int endOffset = startOffset + (int)maxListingLength; // exclusive + if (endOffset > keys.size()) { + endOffset = keys.size(); + } + + String newPriorLastkey = keys.get(endOffset - 1); + if (endOffset == (keys.size())) { + newPriorLastkey = null; + } + + List objects = Lists.newArrayList(); + for(String objectKey : keys.subList(startOffset, endOffset)) { + objects.add(getObjectDetails(bucketName, objectKey)); + } + + return new StorageObjectsChunk( + prefix, delimiter, objects.toArray(new StorageObject[]{}), null, newPriorLastkey); + } + + @Override + public StorageObject getObjectDetails(String bucketName, String objectKey) throws ServiceException + { + + if (!storage.containsKey(bucketName)) { + ServiceException ex = new ServiceException(); + ex.setResponseCode(404); + ex.setErrorCode("NoSuchBucket"); + throw ex; + } + + if (!storage.get(bucketName).contains(objectKey)) { + ServiceException ex = new ServiceException(); + ex.setResponseCode(404); + ex.setErrorCode("NoSuchKey"); + throw ex; + } + + final File objectPath = new File(baseDir, objectKey); + StorageObject storageObject = new StorageObject(); + storageObject.setBucketName(bucketName); + storageObject.setKey(objectKey); + storageObject.setDataInputFile(objectPath); + + return storageObject; + } + + @Override + public S3Object getObject(String bucketName, String objectKey) throws S3ServiceException + { + final File objectPath = new File(baseDir, objectKey); + S3Object s3Object = new S3Object(); + s3Object.setBucketName(bucketName); + s3Object.setKey(objectKey); + s3Object.setDataInputFile(objectPath); + + return s3Object; + + } + + @Override + public S3Object putObject(final String bucketName, final S3Object object) throws S3ServiceException + { + if (!storage.containsKey(bucketName)) { + storage.put(bucketName, Sets.newHashSet()); + } + storage.get(bucketName).add(object.getKey()); + + final File objectPath = new File(baseDir, object.getKey()); + + if (!objectPath.getParentFile().exists()) { + objectPath.getParentFile().mkdirs(); + } + + try { + try ( + InputStream in = object.getDataInputStream() + ) { + FileUtils.copyInputStreamToFile(in, objectPath); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + return object; + } + + @Override + public void deleteObject(String bucketName, String objectKey) throws ServiceException + { + storage.get(bucketName).remove(objectKey); + final File objectPath = new File(baseDir, objectKey); + objectPath.delete(); + } + } +} diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java new file mode 100644 index 00000000000..925e3836c73 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.storage.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import javax.validation.ConstraintViolation; +import javax.validation.Validation; +import javax.validation.Validator; +import java.io.IOException; +import java.util.Set; + +public class S3DataSegmentPusherConfigTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testSerialization() throws IOException + { + String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," + +"\"disableAcl\":false,\"maxListingLength\":2000}"; + + S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class); + Assert.assertEquals(jsonConfig, jsonMapper.writeValueAsString(config)); + } + + @Test + public void testSerializationWithDefaults() throws IOException + { + String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}"; + String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," + +"\"disableAcl\":false,\"maxListingLength\":1000}"; + + S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class); + Assert.assertEquals(expectedJsonConfig, jsonMapper.writeValueAsString(config)); + } + + @Test + public void testSerializationValidatingMaxListingLength() throws IOException + { + String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," + +"\"disableAcl\":false,\"maxListingLength\":-1}"; + Validator validator = Validation.buildDefaultValidatorFactory().getValidator(); + + S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class); + Set> violations = validator.validate(config); + Assert.assertEquals(1, violations.size()); + ConstraintViolation violation = Iterators.getOnlyElement(violations.iterator()); + Assert.assertEquals("must be greater than or equal to 0", violation.getMessage()); + } +} diff --git a/services/src/main/java/io/druid/cli/InsertSegment.java b/services/src/main/java/io/druid/cli/InsertSegment.java index 7f451bce8d9..ea173316572 100644 --- a/services/src/main/java/io/druid/cli/InsertSegment.java +++ b/services/src/main/java/io/druid/cli/InsertSegment.java @@ -58,7 +58,7 @@ public class InsertSegment extends GuiceRunnable private String workingDirPath; @Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json if the path in loadSpec is different from where desciptor.json was found. Default value is true", required = false) - private boolean updateDescriptor = true; + private String updateDescriptor; private ObjectMapper mapper; private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; @@ -97,7 +97,7 @@ public class InsertSegment extends GuiceRunnable Set segments = null; try { - segments = dataSegmentFinder.findSegments(workingDirPath, updateDescriptor); + segments = dataSegmentFinder.findSegments(workingDirPath, Boolean.valueOf(updateDescriptor)); } catch (SegmentLoadingException e) { Throwables.propagate(e);