diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index 470e04060aa..71f14d95a3c 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -35,6 +35,8 @@ import java.net.URI; /** * InputEntity abstracts an input entity and knows how to read bytes from the given entity. + * Since the implementations of this interface assume that the given entity is not empty, the InputSources + * should not create InputEntities for empty entities. */ @UnstableApi public interface InputEntity diff --git a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java index 4f810e2c629..4b834c14c6c 100644 --- a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java @@ -22,6 +22,7 @@ package org.apache.druid.data.input; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import javax.annotation.Nullable; import java.util.ArrayList; @@ -61,6 +62,10 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec @Override public Iterator> split(Iterator inputIterator, Function inputAttributeExtractor) { + final Iterator nonEmptyFileOnlyIterator = Iterators.filter( + inputIterator, + input -> inputAttributeExtractor.apply(input).getSize() > 0 + ); return new Iterator>() { private T peeking; @@ -68,7 +73,7 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec @Override public boolean hasNext() { - return peeking != null || inputIterator.hasNext(); + return peeking != null || nonEmptyFileOnlyIterator.hasNext(); } @Override @@ -79,9 +84,9 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec } final List current = new ArrayList<>(); long splitSize = 0; - while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) { + while (splitSize < maxSplitSize && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) { if (peeking == null) { - peeking = inputIterator.next(); + peeking = nonEmptyFileOnlyIterator.next(); } final long size = inputAttributeExtractor.apply(peeking).getSize(); if (current.isEmpty() || splitSize + size < maxSplitSize) { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java index a99f7cdc444..1e7b59fcf4a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -38,6 +39,7 @@ public class InlineInputSource extends AbstractInputSource @JsonCreator public InlineInputSource(@JsonProperty("data") String data) { + Preconditions.checkArgument(data != null && !data.isEmpty(), "empty data"); this.data = data; } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 65b0f6195b1..04b7dc7b2cd 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -125,10 +125,14 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI @VisibleForTesting Iterator getFileIterator() { - return Iterators.concat( - getDirectoryListingIterator(), - getFilesListIterator() - ); + return + Iterators.filter( + Iterators.concat( + getDirectoryListingIterator(), + getFilesListIterator() + ), + file -> file.length() > 0 + ); } private Iterator getDirectoryListingIterator() diff --git a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java index 6e7db2992db..5ec57c0b83e 100644 --- a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java +++ b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java @@ -20,8 +20,8 @@ package org.apache.druid.data.input; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import nl.jqno.equalsverifier.EqualsVerifier; -import org.apache.commons.compress.utils.Lists; import org.junit.Assert; import org.junit.Test; @@ -79,6 +79,29 @@ public class MaxSizeSplitHintSpecTest } } + @Test + public void testSplitSkippingEmptyInputs() + { + final int nonEmptyInputSize = 3; + final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L); + final Function inputAttributeExtractor = InputFileAttribute::new; + final IntStream dataStream = IntStream.concat( + IntStream.concat( + IntStream.generate(() -> 0).limit(10), + IntStream.generate(() -> nonEmptyInputSize).limit(10) + ), + IntStream.generate(() -> 0).limit(10) + ); + final List> splits = Lists.newArrayList( + splitHintSpec.split(dataStream.iterator(), inputAttributeExtractor) + ); + Assert.assertEquals(4, splits.size()); + Assert.assertEquals(3, splits.get(0).size()); + Assert.assertEquals(3, splits.get(1).size()); + Assert.assertEquals(3, splits.get(2).size()); + Assert.assertEquals(1, splits.get(3).size()); + } + @Test public void testEquals() { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index 6d7342c4180..97814355425 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; @@ -33,6 +34,9 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -66,7 +70,7 @@ public class LocalInputSourceTest { final long fileSize = 15; final long maxSplitSize = 50; - final Set files = prepareFiles(10, fileSize); + final Set files = mockFiles(10, fileSize); final LocalInputSource inputSource = new LocalInputSource(null, null, files); final List>> splits = inputSource .createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize)) @@ -83,7 +87,7 @@ public class LocalInputSourceTest { final long fileSize = 13; final long maxSplitSize = 40; - final Set files = prepareFiles(10, fileSize); + final Set files = mockFiles(10, fileSize); final LocalInputSource inputSource = new LocalInputSource(null, null, files); Assert.assertEquals( 4, @@ -97,11 +101,19 @@ public class LocalInputSourceTest File baseDir = temporaryFolder.newFolder(); List filesInBaseDir = new ArrayList<>(); for (int i = 0; i < 10; i++) { - filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { + writer.write("test"); + } + filesInBaseDir.add(file); } Set files = new HashSet<>(filesInBaseDir.subList(0, 5)); for (int i = 0; i < 3; i++) { - files.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { + writer.write("test"); + } + files.add(file); } Set expectedFiles = new HashSet<>(filesInBaseDir); expectedFiles.addAll(files); @@ -117,7 +129,11 @@ public class LocalInputSourceTest File baseDir = temporaryFolder.newFolder(); Set filesInBaseDir = new HashSet<>(); for (int i = 0; i < 10; i++) { - filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { + writer.write("test"); + } + filesInBaseDir.add(file); } Iterator fileIterator = new LocalInputSource(baseDir, "*", null).getFileIterator(); Set actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet()); @@ -130,14 +146,28 @@ public class LocalInputSourceTest File baseDir = temporaryFolder.newFolder(); Set filesInBaseDir = new HashSet<>(); for (int i = 0; i < 10; i++) { - filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir)); + final File file = File.createTempFile("local-input-source", ".data", baseDir); + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { + writer.write("test"); + } + filesInBaseDir.add(file); } Iterator fileIterator = new LocalInputSource(null, null, filesInBaseDir).getFileIterator(); Set actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet()); Assert.assertEquals(filesInBaseDir, actualFiles); } - private static Set prepareFiles(int numFiles, long fileSize) + @Test + public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly() + { + final Set files = new HashSet<>(mockFiles(10, 5)); + files.addAll(mockFiles(10, 0)); + final LocalInputSource inputSource = new LocalInputSource(null, null, files); + List iteratedFiles = Lists.newArrayList(inputSource.getFileIterator()); + Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0)); + } + + private static Set mockFiles(int numFiles, long fileSize) { final Set files = new HashSet<>(); for (int i = 0; i < numFiles; i++) { diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 39e838a9560..a3e7d1c80ae 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -74,7 +74,7 @@ You may want to consider the below things: - You may want to control the amount of input data each worker task processes. This can be controlled using different configurations depending on the phase in parallel ingestion (see [`partitionsSpec`](#partitionsspec) for more details). - For the tasks that read data from the `inputSource`, you can set the [SplitHintSpec](#splithintspec) in the `tuningConfig`. + For the tasks that read data from the `inputSource`, you can set the [Split hint spec](#split-hint-spec) in the `tuningConfig`. For the tasks that merge shuffled segments, you can set the `totalNumMergeTasks` in the `tuningConfig`. - The number of concurrent worker tasks in parallel ingestion is determined by `maxNumConcurrentSubTasks` in the `tuningConfig`. The supervisor task checks the number of current running worker tasks and creates more if it's smaller than `maxNumConcurrentSubTasks` @@ -202,7 +202,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| -|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [SplitHintSpec](#splithintspec) for more details.|null|`maxSize`| +|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no| |partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no| |indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no| |indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no| @@ -219,23 +219,23 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| |chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no| -### `splitHintSpec` +### Split Hint Spec -`SplitHintSpec` is used to give a hint when the supervisor task creates input splits. +The split hint spec is used to give a hint when the supervisor task creates input splits. Note that each worker task processes a single input split. You can control the amount of data each worker task will read during the first phase. -#### `MaxSizeSplitHintSpec` +#### Size-based Split Hint Spec -`MaxSizeSplitHintSpec` is respected by all splittable input sources except for the HTTP input source. +The size-based split hint spec is respected by all splittable input sources except for the HTTP input source. |property|description|default|required?| |--------|-----------|-------|---------| |type|This should always be `maxSize`.|none|yes| |maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (Files are never split across tasks yet).|500MB|no| -#### `SegmentsSplitHintSpec` +#### Segments Split Hint Spec -`SegmentsSplitHintSpec` is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)). +The segments split hint spec is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)). |property|description|default|required?| |--------|-----------|-------|---------| @@ -294,7 +294,7 @@ How the worker task creates segments is: The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce). The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`. - In the `partial segment generation` phase, just like the Map phase in MapReduce, -the Parallel task splits the input data based on `splitHintSpec` +the Parallel task splits the input data based on the split hint spec and assigns each split to a worker task. Each worker task (type `partial_index_generate`) reads the assigned split, and partitions rows by the time chunk from `segmentGranularity` (primary partition key) in the `granularitySpec` and then by the hash value of `partitionDimensions` (secondary partition key) in the `partitionsSpec`. @@ -326,7 +326,7 @@ The first phase is to collect some statistics to find the best partitioning and the other 2 phases are to create partial segments and to merge them, respectively, as in hash-based partitioning. - In the `partial dimension distribution` phase, the Parallel task splits the input data and -assigns them to worker tasks based on `splitHintSpec`. Each worker task (type `partial_dimension_distribution`) reads +assigns them to worker tasks based on the split hint spec. Each worker task (type `partial_dimension_distribution`) reads the assigned split and builds a histogram for `partitionDimension`. The Parallel task collects those histograms from worker tasks and finds the best range partitioning based on `partitionDimension` to evenly @@ -839,10 +839,12 @@ Sample specs: |--------|-----------|-------|---------| |type|This should be `s3`.|None|yes| |uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| |properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given) +Note that the S3 input source will skip all empty objects only when `prefixes` is specified. + S3 Object: |property|description|default|required?| @@ -927,9 +929,11 @@ Sample specs: |--------|-----------|-------|---------| |type|This should be `google`.|None|yes| |uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified. + Google Cloud Storage object: |property|description|default|required?| @@ -1004,9 +1008,11 @@ Sample specs: |--------|-----------|-------|---------| |type|This should be `google`.|None|yes| |uris|JSON array of URIs where Azure Blob objects to be ingested are located. Should be in form "azure://\/\"|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\/\"|None|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\/\". Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of Azure Blob objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +Note that the Azure input source will skip all empty objects only when `prefixes` is specified. + Azure Blob object: |property|description|default|required?| @@ -1092,7 +1098,7 @@ Sample specs: |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `hdfs`.|None|yes| -|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|None|yes| +|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes| You can also ingest from cloud storage using the HDFS input source. However, if you want to read from AWS S3 or Google Cloud Storage, consider using @@ -1233,8 +1239,8 @@ Sample spec: |--------|-----------|---------| |type|This should be "local".|yes| |filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes if `baseDir` is specified| -|baseDir|Directory to search recursively for files to be ingested. |At least one of `baseDir` or `files` should be specified| -|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. |At least one of `baseDir` or `files` should be specified| +|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified| +|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified| ### Druid Input Source @@ -1529,7 +1535,7 @@ This firehose will accept any type of parser, but will only utilize the list of |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](../querying/filters.md)|no| -|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| +|maxInputSegmentBytesPerTask|Deprecated. Use [Segments Split Hint Spec](#segments-split-hint-spec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java index 72d6509f5ec..c2a696c3d75 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java @@ -150,7 +150,7 @@ public class AzureCloudBlobIterator implements Iterator while (blobItemIterator.hasNext()) { ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next()); /* skip directory objects */ - if (blobItem.isCloudBlob()) { + if (blobItem.isCloudBlob() && blobItem.getCloudBlob().getBlobLength() > 0) { currentBlobItem = blobItem.getCloudBlob(); return; } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java index 35d1e5ff58f..8a7f16cf5be 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java @@ -148,7 +148,7 @@ public class AzureInputSourceTest extends EasyMockSupport EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator); EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1)) .andReturn(CLOUD_OBJECT_LOCATION_1); - EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L); + EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes(); replayAll(); azureInputSource = new AzureInputSource( diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java index e0fd4b45545..22bbdba158b 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java @@ -20,6 +20,7 @@ package org.apache.druid.storage.azure; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.microsoft.azure.storage.ResultContinuation; import com.microsoft.azure.storage.ResultSegment; import com.microsoft.azure.storage.StorageException; @@ -127,10 +128,12 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport blobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItem.class); cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class); cloudBlobDruidPrefixWithOnlyCloudBlobs1 = createMock(CloudBlobHolder.class); + EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs1.getBlobLength()).andReturn(10L).anyTimes(); blobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItem.class); cloudBlobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItemHolder.class); cloudBlobDruidPrefixWithOnlyCloudBlobs2 = createMock(CloudBlobHolder.class); + EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs2.getBlobLength()).andReturn(10L).anyTimes(); blobItemPrefixWithCloudBlobsAndDirectories1 = createMock(ListBlobItem.class); directoryItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class); @@ -138,6 +141,7 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport blobItemPrefixWithCloudBlobsAndDirectories2 = createMock(ListBlobItem.class); cloudBlobItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class); cloudBlobDruidPrefixWithCloudBlobsAndDirectories = createMock(CloudBlobHolder.class); + EasyMock.expect(cloudBlobDruidPrefixWithCloudBlobsAndDirectories.getBlobLength()).andReturn(10L).anyTimes(); blobItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItem.class); directoryItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItemHolder.class); @@ -163,13 +167,13 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithOnlyCloudBlobs1); + cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes(); EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn( cloudBlobItemPrefixWithOnlyCloudBlobs1); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.isCloudBlob()).andReturn(true); EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithOnlyCloudBlobs2); + cloudBlobDruidPrefixWithOnlyCloudBlobs2).anyTimes(); EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs2)).andReturn( cloudBlobItemPrefixWithOnlyCloudBlobs2); @@ -179,7 +183,7 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(true); EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithCloudBlobsAndDirectories); + cloudBlobDruidPrefixWithCloudBlobsAndDirectories).anyTimes(); EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories2)).andReturn( cloudBlobItemPrefixWithCloudBlobsAndDirectories); @@ -273,6 +277,57 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport verifyAll(); } + @Test + public void test_next_emptyObjects_skipEmptyObjects() throws URISyntaxException, StorageException + { + EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true); + EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn( + cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes(); + EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn( + cloudBlobItemPrefixWithOnlyCloudBlobs1); + + ListBlobItem emptyBlobItem = createMock(ListBlobItem.class); + ListBlobItemHolder emptyBlobItemHolder = createMock(ListBlobItemHolder.class); + CloudBlobHolder emptyBlobHolder = createMock(CloudBlobHolder.class); + EasyMock.expect(emptyBlobHolder.getBlobLength()).andReturn(0L).anyTimes(); + EasyMock.expect(emptyBlobItemHolder.isCloudBlob()).andReturn(true); + EasyMock.expect(emptyBlobItemHolder.getCloudBlob()).andReturn(emptyBlobHolder).anyTimes(); + + EasyMock.expect(blobItemDruidFactory.create(emptyBlobItem)).andReturn(emptyBlobItemHolder); + + EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( + CONTAINER1, + PREFIX_ONLY_CLOUD_BLOBS, + nullResultContinuationToken, + MAX_LISTING_LENGTH + )).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1); + + EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken()) + .andReturn(nullResultContinuationToken); + ArrayList resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>(); + resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1); + resultBlobItemsPrefixWithOnlyCloudBlobs1.add(emptyBlobItem); + EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults()) + .andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1); + + replayAll(); + + azureCloudBlobIterator = new AzureCloudBlobIterator( + storage, + blobItemDruidFactory, + config, + ImmutableList.of(PREFIX_ONLY_CLOUD_BLOBS_URI), + MAX_LISTING_LENGTH + ); + + List expectedBlobItems = ImmutableList.of(cloudBlobDruidPrefixWithOnlyCloudBlobs1); + List actualBlobItems = Lists.newArrayList(azureCloudBlobIterator); + Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size()); + Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems)); + verifyAll(); + } + @Test(expected = NoSuchElementException.class) public void test_next_emptyPrefixes_throwsNoSuchElementException() { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index b1ae3a4ff9d..9d37b3f7f50 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -47,7 +47,7 @@ import java.util.stream.Stream; public class GoogleCloudStorageInputSource extends CloudObjectInputSource { - static final String SCHEME = "gs"; + public static final String SCHEME = "gs"; private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); @@ -117,7 +117,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject) { - return new CloudObjectLocation(storageObject.getBucket(), storageObject.getName()); + return GoogleUtils.objectToCloudObjectLocation(storageObject); } private Iterable storageObjectIterable() diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 9fbb23f040c..2c181785065 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -20,18 +20,15 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.StringUtils; import java.io.IOException; import java.net.URI; import java.util.Iterator; -import java.util.NoSuchElementException; public class GoogleUtils { @@ -46,11 +43,20 @@ public class GoogleUtils return t instanceof IOException; } - private static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception + static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception { return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); } + public static URI objectToUri(StorageObject object) + { + return objectToCloudObjectLocation(object).toUri(GoogleCloudStorageInputSource.SCHEME); + } + + public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object) + { + return new CloudObjectLocation(object.getBucket(), object.getName()); + } public static Iterator lazyFetchingStorageObjectsIterator( final GoogleStorage storage, @@ -58,91 +64,6 @@ public class GoogleUtils final long maxListingLength ) { - return new Iterator() - { - private Storage.Objects.List listRequest; - private Objects results; - private URI currentUri; - private String currentBucket; - private String currentPrefix; - private String nextPageToken; - private Iterator storageObjectsIterator; - - { - nextPageToken = null; - prepareNextRequest(); - fetchNextBatch(); - } - - private void prepareNextRequest() - { - try { - currentUri = uris.next(); - currentBucket = currentUri.getAuthority(); - currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); - nextPageToken = null; - listRequest = storage.list(currentBucket) - .setPrefix(currentPrefix) - .setMaxResults(maxListingLength); - - } - catch (IOException io) { - throw new RuntimeException(io); - } - } - - private void fetchNextBatch() - { - try { - listRequest.setPageToken(nextPageToken); - results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); - storageObjectsIterator = results.getItems().iterator(); - nextPageToken = results.getNextPageToken(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - @Override - public boolean hasNext() - { - return storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext(); - } - - @Override - public StorageObject next() - { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - while (storageObjectsIterator.hasNext()) { - final StorageObject next = storageObjectsIterator.next(); - // list with prefix can return directories, but they should always end with `/`, ignore them - if (!next.getName().endsWith("/")) { - return next; - } - } - - if (nextPageToken != null) { - fetchNextBatch(); - } else if (uris.hasNext()) { - prepareNextRequest(); - fetchNextBatch(); - } - - if (!storageObjectsIterator.hasNext()) { - throw new ISE( - "Failed to further iterate on bucket[%s] and prefix[%s]. The last page token was [%s]", - currentBucket, - currentPrefix, - nextPageToken - ); - } - - return next(); - } - }; + return new ObjectStorageIterator(storage, uris, maxListingLength); } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java new file mode 100644 index 00000000000..10275112f6f --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import org.apache.druid.java.util.common.StringUtils; + +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class ObjectStorageIterator implements Iterator +{ + private final GoogleStorage storage; + private final Iterator uris; + private final long maxListingLength; + + private Storage.Objects.List listRequest; + private Objects results; + private URI currentUri; + private String nextPageToken; + private Iterator storageObjectsIterator; + private StorageObject currentObject; + + public ObjectStorageIterator(GoogleStorage storage, Iterator uris, long maxListingLength) + { + this.storage = storage; + this.uris = uris; + this.maxListingLength = maxListingLength; + this.nextPageToken = null; + + prepareNextRequest(); + fetchNextBatch(); + advanceStorageObject(); + } + + private void prepareNextRequest() + { + try { + currentUri = uris.next(); + String currentBucket = currentUri.getAuthority(); + String currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); + nextPageToken = null; + listRequest = storage.list(currentBucket) + .setPrefix(currentPrefix) + .setMaxResults(maxListingLength); + + } + catch (IOException io) { + throw new RuntimeException(io); + } + } + + private void fetchNextBatch() + { + try { + listRequest.setPageToken(nextPageToken); + results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); + storageObjectsIterator = results.getItems().iterator(); + nextPageToken = results.getNextPageToken(); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public boolean hasNext() + { + return currentObject != null; + } + + @Override + public StorageObject next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final StorageObject retVal = currentObject; + advanceStorageObject(); + return retVal; + } + + private void advanceStorageObject() + { + while (storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext()) { + while (storageObjectsIterator.hasNext()) { + final StorageObject next = storageObjectsIterator.next(); + // list with prefix can return directories, but they should always end with `/`, ignore them. + // also skips empty objects. + if (!next.getName().endsWith("/") && next.getSize().signum() > 0) { + currentObject = next; + return; + } + } + + if (nextPageToken != null) { + fetchNextBatch(); + } else if (uris.hasNext()) { + prepareNextRequest(); + fetchNextBatch(); + } + } + + // Truly nothing left to read. + currentObject = null; + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java new file mode 100644 index 00000000000..4d1504f842c --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.storage.google.ObjectStorageIteratorTest.MockStorage.MockObjects.MockList; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigInteger; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class ObjectStorageIteratorTest +{ + private static final ImmutableList TEST_OBJECTS = + ImmutableList.of( + makeStorageObject("b", "foo", 10L), + makeStorageObject("b", "foo/", 0L), // directory + makeStorageObject("b", "foo/bar1", 10L), + makeStorageObject("b", "foo/bar2", 10L), + makeStorageObject("b", "foo/bar3", 10L), + makeStorageObject("b", "foo/bar4", 10L), + makeStorageObject("b", "foo/bar5", 0L), // empty object + makeStorageObject("b", "foo/baz", 10L), + makeStorageObject("bucketnotmine", "a/different/bucket", 10L), + makeStorageObject("b", "foo/bar/", 0L) // another directory at the end of list + ); + + @Test + public void testSingleObject() + { + test( + ImmutableList.of("gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/baz"), + 5 + ); + } + + @Test + public void testMultiObjectOneKeyAtATime() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/"), + 1 + ); + } + + @Test + public void testMultiObjectTwoKeysAtATime() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/"), + 2 + ); + } + + @Test + public void testMultiObjectTenKeysAtATime() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/"), + 10 + ); + } + + @Test + public void testPrefixInMiddleOfKey() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4"), + ImmutableList.of("gs://b/foo/bar"), + 10 + ); + } + + @Test + public void testNoPath() + { + test( + ImmutableList.of( + "gs://b/foo", + "gs://b/foo/bar1", + "gs://b/foo/bar2", + "gs://b/foo/bar3", + "gs://b/foo/bar4", + "gs://b/foo/baz" + ), + ImmutableList.of("gs://b"), + 10 + ); + } + + @Test + public void testSlashPath() + { + test( + ImmutableList.of( + "gs://b/foo", + "gs://b/foo/bar1", + "gs://b/foo/bar2", + "gs://b/foo/bar3", + "gs://b/foo/bar4", + "gs://b/foo/baz" + ), + ImmutableList.of("gs://b/"), + 10 + ); + } + + @Test + public void testDifferentBucket() + { + test( + ImmutableList.of(), + ImmutableList.of("gs://bx/foo/"), + 10 + ); + } + + @Test + public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes() + { + test( + ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"), + ImmutableList.of("gs://b/foo/bar", "gs://b/foo/baz"), + 10 + ); + } + + private static void test( + final List expectedUris, + final List prefixes, + final int maxListingLength + ) + { + final List expectedObjects = new ArrayList<>(); + + // O(N^2) but who cares -- the list is short. + for (final String uri : expectedUris) { + final List matches = TEST_OBJECTS + .stream() + .filter(storageObject -> GoogleUtils.objectToUri(storageObject).toString().equals(uri)) + .collect(Collectors.toList()); + + expectedObjects.add(Iterables.getOnlyElement(matches)); + } + + final List actualObjects = ImmutableList.copyOf( + GoogleUtils.lazyFetchingStorageObjectsIterator( + makeMockClient(TEST_OBJECTS), + prefixes.stream().map(URI::create).iterator(), + maxListingLength + ) + ); + + Assert.assertEquals( + prefixes.toString(), + expectedObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList()), + actualObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList()) + ); + } + + /** + * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the + * {@link ObjectStorageIterator} class. + */ + private static GoogleStorage makeMockClient(final List storageObjects) + { + return new GoogleStorage(null) + { + @Override + public Storage.Objects.List list(final String bucket) + { + return mockList(bucket, storageObjects); + } + }; + } + + @SuppressWarnings("UnnecessaryFullyQualifiedName") + static class MockStorage extends Storage + { + private MockStorage() + { + super( + EasyMock.niceMock(HttpTransport.class), + EasyMock.niceMock(JsonFactory.class), + EasyMock.niceMock(HttpRequestInitializer.class) + ); + } + + private MockList mockList(String bucket, java.util.List storageObjects) + { + return new MockObjects().mockList(bucket, storageObjects); + } + + class MockObjects extends Storage.Objects + { + private MockList mockList(String bucket, java.util.List storageObjects) + { + return new MockList(bucket, storageObjects); + } + + class MockList extends Objects.List + { + private final java.util.List storageObjects; + + private MockList(String bucket, java.util.List storageObjects) + { + super(bucket); + this.storageObjects = storageObjects; + } + + @Override + public com.google.api.services.storage.model.Objects execute() + { + // Continuation token is an index in the "objects" list. + final String continuationToken = getPageToken(); + final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken); + + // Find matching objects. + java.util.List objects = new ArrayList<>(); + int nextIndex = -1; + + for (int i = startIndex; i < storageObjects.size(); i++) { + final StorageObject storageObject = storageObjects.get(i); + + if (storageObject.getBucket().equals(getBucket()) + && storageObject.getName().startsWith(getPrefix())) { + + if (objects.size() == getMaxResults()) { + // We reached our max key limit; set nextIndex (which will lead to a result with truncated = true). + nextIndex = i; + break; + } + + // Generate a summary. + objects.add(storageObject); + } + } + + com.google.api.services.storage.model.Objects retVal = new com.google.api.services.storage.model.Objects(); + retVal.setItems(objects); + if (nextIndex >= 0) { + retVal.setNextPageToken(String.valueOf(nextIndex)); + } else { + retVal.setNextPageToken(null); + } + return retVal; + } + } + } + } + + private static MockList mockList(String bucket, List storageObjects) + { + return new MockStorage().mockList(bucket, storageObjects); + } + + private static StorageObject makeStorageObject(final String bucket, final String key, final long size) + { + final StorageObject summary = new StorageObject(); + summary.setBucket(bucket); + summary.setName(key); + summary.setSize(BigInteger.valueOf(size)); + return summary; + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 661a07894e2..be7f3c88340 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -116,6 +116,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn return new HdfsFileInputFormat().getSplits(job) .stream() + .filter(split -> ((FileSplit) split).getLength() > 0) .map(split -> ((FileSplit) split).getPath()) .collect(Collectors.toSet()); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java index 0e791cd40e5..a515c153b0e 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java @@ -125,8 +125,8 @@ public class ObjectSummaryIterator implements Iterator while (objectSummaryIterator.hasNext() || result.isTruncated() || prefixesIterator.hasNext()) { while (objectSummaryIterator.hasNext()) { currentObjectSummary = objectSummaryIterator.next(); - - if (!isDirectoryPlaceholder(currentObjectSummary)) { + // skips directories and empty objects + if (!isDirectoryPlaceholder(currentObjectSummary) && currentObjectSummary.getSize() > 0) { return; } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 59b6303eb28..122f4fd6a56 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -374,8 +374,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest public void testWithPrefixesSplit() { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); - expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT); EasyMock.replay(S3_CLIENT); S3InputSource inputSource = new S3InputSource( @@ -401,8 +401,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest public void testCreateSplitsWithSplitHintSpecRespectingHint() { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); - expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT); EasyMock.replay(S3_CLIENT); S3InputSource inputSource = new S3InputSource( @@ -412,7 +412,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest null, PREFIXES, null, - null); + null + ); Stream>> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -426,11 +427,40 @@ public class S3InputSourceTest extends InitializedNullHandlingTest EasyMock.verify(S3_CLIENT); } + @Test + public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects() + { + EasyMock.reset(S3_CLIENT); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), new byte[0]); + EasyMock.replay(S3_CLIENT); + + S3InputSource inputSource = new S3InputSource( + SERVICE, + SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER, + INPUT_DATA_CONFIG, + null, + PREFIXES, + null, + null + ); + + Stream>> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null + ); + Assert.assertEquals( + ImmutableList.of(ImmutableList.of(new CloudObjectLocation(EXPECTED_URIS.get(0)))), + splits.map(InputSplit::get).collect(Collectors.toList()) + ); + EasyMock.verify(S3_CLIENT); + } + @Test public void testAccessDeniedWhileListingPrefix() { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); expectListObjectsAndThrowAccessDenied(EXPECTED_URIS.get(1)); EasyMock.replay(S3_CLIENT); @@ -459,8 +489,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest public void testReader() throws IOException { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); - expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); + expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT); expectGetObject(EXPECTED_URIS.get(0)); expectGetObject(EXPECTED_URIS.get(1)); EasyMock.replay(S3_CLIENT); @@ -503,8 +533,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest public void testCompressedReader() throws IOException { EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0))); - expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1))); + expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)), CONTENT); + expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)), CONTENT); expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(0)); expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(1)); EasyMock.replay(S3_CLIENT); @@ -543,7 +573,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest EasyMock.verify(S3_CLIENT); } - private static void expectListObjects(URI prefix, List uris) + private static void expectListObjects(URI prefix, List uris, byte[] content) { final ListObjectsV2Result result = new ListObjectsV2Result(); result.setBucketName(prefix.getAuthority()); @@ -554,7 +584,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest final S3ObjectSummary objectSummary = new S3ObjectSummary(); objectSummary.setBucketName(bucket); objectSummary.setKey(key); - objectSummary.setSize(CONTENT.length); + objectSummary.setSize(content.length); result.getObjectSummaries().add(objectSummary); } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java index d63409cee19..ea2ca4af26c 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java @@ -37,13 +37,15 @@ public class ObjectSummaryIteratorTest private static final ImmutableList TEST_OBJECTS = ImmutableList.of( makeObjectSummary("b", "foo", 10L), - makeObjectSummary("b", "foo/", 0L), + makeObjectSummary("b", "foo/", 0L), // directory makeObjectSummary("b", "foo/bar1", 10L), makeObjectSummary("b", "foo/bar2", 10L), makeObjectSummary("b", "foo/bar3", 10L), makeObjectSummary("b", "foo/bar4", 10L), + makeObjectSummary("b", "foo/bar5", 0L), // empty object makeObjectSummary("b", "foo/baz", 10L), - makeObjectSummary("bucketnotmine", "a/different/bucket", 10L) + makeObjectSummary("bucketnotmine", "a/different/bucket", 10L), + makeObjectSummary("b", "foo/bar/", 0L) // another directory at the end of list ); @Test @@ -140,6 +142,16 @@ public class ObjectSummaryIteratorTest ); } + @Test + public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes() + { + test( + ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"), + ImmutableList.of("s3://b/foo/bar", "s3://b/foo/baz"), + 10 + ); + } + private static void test( final List expectedUris, final List prefixes, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java index 8e144c2340f..f839d3df4c7 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java @@ -46,10 +46,12 @@ public class S3TimestampVersionedDataFinderTest object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModified(new Date(0)); + object0.setSize(10); object1.setBucketName(bucket); object1.setKey(keyPrefix + "/renames-1.gz"); object1.setLastModified(new Date(1)); + object1.setSize(10); final ListObjectsV2Result result = new ListObjectsV2Result(); result.getObjectSummaries().add(object0); @@ -116,6 +118,7 @@ public class S3TimestampVersionedDataFinderTest object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModified(new Date(0)); + object0.setSize(10); final ListObjectsV2Result result = new ListObjectsV2Result(); result.getObjectSummaries().add(object0); @@ -153,6 +156,7 @@ public class S3TimestampVersionedDataFinderTest object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); object0.setLastModified(new Date(0)); + object0.setSize(10); final ListObjectsV2Result result = new ListObjectsV2Result(); result.getObjectSummaries().add(object0);