mirror of https://github.com/apache/druid.git
Skip empty files for local, hdfs, and cloud input sources (#9450)
* Skip empty files for local, hdfs, and cloud input sources * split hint spec doc * doc for skipping empty files * fix typo; adjust tests * unnecessary fluent iterable * address comments * fix test * use the right lists * fix test * fix test
This commit is contained in:
parent
99095c4ac5
commit
9466ac7c9b
|
@ -35,6 +35,8 @@ import java.net.URI;
|
|||
|
||||
/**
|
||||
* InputEntity abstracts an input entity and knows how to read bytes from the given entity.
|
||||
* Since the implementations of this interface assume that the given entity is not empty, the InputSources
|
||||
* should not create InputEntities for empty entities.
|
||||
*/
|
||||
@UnstableApi
|
||||
public interface InputEntity
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.data.input;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Iterators;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
|
@ -61,6 +62,10 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
|
|||
@Override
|
||||
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
|
||||
{
|
||||
final Iterator<T> nonEmptyFileOnlyIterator = Iterators.filter(
|
||||
inputIterator,
|
||||
input -> inputAttributeExtractor.apply(input).getSize() > 0
|
||||
);
|
||||
return new Iterator<List<T>>()
|
||||
{
|
||||
private T peeking;
|
||||
|
@ -68,7 +73,7 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
|
|||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return peeking != null || inputIterator.hasNext();
|
||||
return peeking != null || nonEmptyFileOnlyIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,9 +84,9 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
|
|||
}
|
||||
final List<T> current = new ArrayList<>();
|
||||
long splitSize = 0;
|
||||
while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
|
||||
while (splitSize < maxSplitSize && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) {
|
||||
if (peeking == null) {
|
||||
peeking = inputIterator.next();
|
||||
peeking = nonEmptyFileOnlyIterator.next();
|
||||
}
|
||||
final long size = inputAttributeExtractor.apply(peeking).getSize();
|
||||
if (current.isEmpty() || splitSize + size < maxSplitSize) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.data.input.AbstractInputSource;
|
||||
import org.apache.druid.data.input.InputFormat;
|
||||
import org.apache.druid.data.input.InputRowSchema;
|
||||
|
@ -38,6 +39,7 @@ public class InlineInputSource extends AbstractInputSource
|
|||
@JsonCreator
|
||||
public InlineInputSource(@JsonProperty("data") String data)
|
||||
{
|
||||
Preconditions.checkArgument(data != null && !data.isEmpty(), "empty data");
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
|
|
|
@ -125,10 +125,14 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
|
|||
@VisibleForTesting
|
||||
Iterator<File> getFileIterator()
|
||||
{
|
||||
return Iterators.concat(
|
||||
getDirectoryListingIterator(),
|
||||
getFilesListIterator()
|
||||
);
|
||||
return
|
||||
Iterators.filter(
|
||||
Iterators.concat(
|
||||
getDirectoryListingIterator(),
|
||||
getFilesListIterator()
|
||||
),
|
||||
file -> file.length() > 0
|
||||
);
|
||||
}
|
||||
|
||||
private Iterator<File> getDirectoryListingIterator()
|
||||
|
|
|
@ -20,8 +20,8 @@
|
|||
package org.apache.druid.data.input;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.commons.compress.utils.Lists;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -79,6 +79,29 @@ public class MaxSizeSplitHintSpecTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitSkippingEmptyInputs()
|
||||
{
|
||||
final int nonEmptyInputSize = 3;
|
||||
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L);
|
||||
final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
|
||||
final IntStream dataStream = IntStream.concat(
|
||||
IntStream.concat(
|
||||
IntStream.generate(() -> 0).limit(10),
|
||||
IntStream.generate(() -> nonEmptyInputSize).limit(10)
|
||||
),
|
||||
IntStream.generate(() -> 0).limit(10)
|
||||
);
|
||||
final List<List<Integer>> splits = Lists.newArrayList(
|
||||
splitHintSpec.split(dataStream.iterator(), inputAttributeExtractor)
|
||||
);
|
||||
Assert.assertEquals(4, splits.size());
|
||||
Assert.assertEquals(3, splits.get(0).size());
|
||||
Assert.assertEquals(3, splits.get(1).size());
|
||||
Assert.assertEquals(3, splits.get(2).size());
|
||||
Assert.assertEquals(1, splits.get(3).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals()
|
||||
{
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.data.input.impl;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.druid.data.input.InputSource;
|
||||
import org.apache.druid.data.input.InputSplit;
|
||||
|
@ -33,6 +34,9 @@ import org.junit.rules.TemporaryFolder;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -66,7 +70,7 @@ public class LocalInputSourceTest
|
|||
{
|
||||
final long fileSize = 15;
|
||||
final long maxSplitSize = 50;
|
||||
final Set<File> files = prepareFiles(10, fileSize);
|
||||
final Set<File> files = mockFiles(10, fileSize);
|
||||
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
|
||||
final List<InputSplit<List<File>>> splits = inputSource
|
||||
.createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize))
|
||||
|
@ -83,7 +87,7 @@ public class LocalInputSourceTest
|
|||
{
|
||||
final long fileSize = 13;
|
||||
final long maxSplitSize = 40;
|
||||
final Set<File> files = prepareFiles(10, fileSize);
|
||||
final Set<File> files = mockFiles(10, fileSize);
|
||||
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
|
||||
Assert.assertEquals(
|
||||
4,
|
||||
|
@ -97,11 +101,19 @@ public class LocalInputSourceTest
|
|||
File baseDir = temporaryFolder.newFolder();
|
||||
List<File> filesInBaseDir = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
|
||||
final File file = File.createTempFile("local-input-source", ".data", baseDir);
|
||||
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
|
||||
writer.write("test");
|
||||
}
|
||||
filesInBaseDir.add(file);
|
||||
}
|
||||
Set<File> files = new HashSet<>(filesInBaseDir.subList(0, 5));
|
||||
for (int i = 0; i < 3; i++) {
|
||||
files.add(File.createTempFile("local-input-source", ".data", baseDir));
|
||||
final File file = File.createTempFile("local-input-source", ".data", baseDir);
|
||||
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
|
||||
writer.write("test");
|
||||
}
|
||||
files.add(file);
|
||||
}
|
||||
Set<File> expectedFiles = new HashSet<>(filesInBaseDir);
|
||||
expectedFiles.addAll(files);
|
||||
|
@ -117,7 +129,11 @@ public class LocalInputSourceTest
|
|||
File baseDir = temporaryFolder.newFolder();
|
||||
Set<File> filesInBaseDir = new HashSet<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
|
||||
final File file = File.createTempFile("local-input-source", ".data", baseDir);
|
||||
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
|
||||
writer.write("test");
|
||||
}
|
||||
filesInBaseDir.add(file);
|
||||
}
|
||||
Iterator<File> fileIterator = new LocalInputSource(baseDir, "*", null).getFileIterator();
|
||||
Set<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet());
|
||||
|
@ -130,14 +146,28 @@ public class LocalInputSourceTest
|
|||
File baseDir = temporaryFolder.newFolder();
|
||||
Set<File> filesInBaseDir = new HashSet<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
|
||||
final File file = File.createTempFile("local-input-source", ".data", baseDir);
|
||||
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
|
||||
writer.write("test");
|
||||
}
|
||||
filesInBaseDir.add(file);
|
||||
}
|
||||
Iterator<File> fileIterator = new LocalInputSource(null, null, filesInBaseDir).getFileIterator();
|
||||
Set<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet());
|
||||
Assert.assertEquals(filesInBaseDir, actualFiles);
|
||||
}
|
||||
|
||||
private static Set<File> prepareFiles(int numFiles, long fileSize)
|
||||
@Test
|
||||
public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly()
|
||||
{
|
||||
final Set<File> files = new HashSet<>(mockFiles(10, 5));
|
||||
files.addAll(mockFiles(10, 0));
|
||||
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
|
||||
List<File> iteratedFiles = Lists.newArrayList(inputSource.getFileIterator());
|
||||
Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0));
|
||||
}
|
||||
|
||||
private static Set<File> mockFiles(int numFiles, long fileSize)
|
||||
{
|
||||
final Set<File> files = new HashSet<>();
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
|
|
|
@ -74,7 +74,7 @@ You may want to consider the below things:
|
|||
|
||||
- You may want to control the amount of input data each worker task processes. This can be
|
||||
controlled using different configurations depending on the phase in parallel ingestion (see [`partitionsSpec`](#partitionsspec) for more details).
|
||||
For the tasks that read data from the `inputSource`, you can set the [SplitHintSpec](#splithintspec) in the `tuningConfig`.
|
||||
For the tasks that read data from the `inputSource`, you can set the [Split hint spec](#split-hint-spec) in the `tuningConfig`.
|
||||
For the tasks that merge shuffled segments, you can set the `totalNumMergeTasks` in the `tuningConfig`.
|
||||
- The number of concurrent worker tasks in parallel ingestion is determined by `maxNumConcurrentSubTasks` in the `tuningConfig`.
|
||||
The supervisor task checks the number of current running worker tasks and creates more if it's smaller than `maxNumConcurrentSubTasks`
|
||||
|
@ -202,7 +202,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|
||||
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|
||||
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|
||||
|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [SplitHintSpec](#splithintspec) for more details.|null|`maxSize`|
|
||||
|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no|
|
||||
|partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no|
|
||||
|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no|
|
||||
|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no|
|
||||
|
@ -219,23 +219,23 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
|
||||
|chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no|
|
||||
|
||||
### `splitHintSpec`
|
||||
### Split Hint Spec
|
||||
|
||||
`SplitHintSpec` is used to give a hint when the supervisor task creates input splits.
|
||||
The split hint spec is used to give a hint when the supervisor task creates input splits.
|
||||
Note that each worker task processes a single input split. You can control the amount of data each worker task will read during the first phase.
|
||||
|
||||
#### `MaxSizeSplitHintSpec`
|
||||
#### Size-based Split Hint Spec
|
||||
|
||||
`MaxSizeSplitHintSpec` is respected by all splittable input sources except for the HTTP input source.
|
||||
The size-based split hint spec is respected by all splittable input sources except for the HTTP input source.
|
||||
|
||||
|property|description|default|required?|
|
||||
|--------|-----------|-------|---------|
|
||||
|type|This should always be `maxSize`.|none|yes|
|
||||
|maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (Files are never split across tasks yet).|500MB|no|
|
||||
|
||||
#### `SegmentsSplitHintSpec`
|
||||
#### Segments Split Hint Spec
|
||||
|
||||
`SegmentsSplitHintSpec` is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)).
|
||||
The segments split hint spec is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)).
|
||||
|
||||
|property|description|default|required?|
|
||||
|--------|-----------|-------|---------|
|
||||
|
@ -294,7 +294,7 @@ How the worker task creates segments is:
|
|||
The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
|
||||
The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
|
||||
- In the `partial segment generation` phase, just like the Map phase in MapReduce,
|
||||
the Parallel task splits the input data based on `splitHintSpec`
|
||||
the Parallel task splits the input data based on the split hint spec
|
||||
and assigns each split to a worker task. Each worker task (type `partial_index_generate`) reads the assigned split,
|
||||
and partitions rows by the time chunk from `segmentGranularity` (primary partition key) in the `granularitySpec`
|
||||
and then by the hash value of `partitionDimensions` (secondary partition key) in the `partitionsSpec`.
|
||||
|
@ -326,7 +326,7 @@ The first phase is to collect some statistics to find
|
|||
the best partitioning and the other 2 phases are to create partial segments
|
||||
and to merge them, respectively, as in hash-based partitioning.
|
||||
- In the `partial dimension distribution` phase, the Parallel task splits the input data and
|
||||
assigns them to worker tasks based on `splitHintSpec`. Each worker task (type `partial_dimension_distribution`) reads
|
||||
assigns them to worker tasks based on the split hint spec. Each worker task (type `partial_dimension_distribution`) reads
|
||||
the assigned split and builds a histogram for `partitionDimension`.
|
||||
The Parallel task collects those histograms from worker tasks and finds
|
||||
the best range partitioning based on `partitionDimension` to evenly
|
||||
|
@ -839,10 +839,12 @@ Sample specs:
|
|||
|--------|-----------|-------|---------|
|
||||
|type|This should be `s3`.|None|yes|
|
||||
|uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given)
|
||||
|
||||
Note that the S3 input source will skip all empty objects only when `prefixes` is specified.
|
||||
|
||||
S3 Object:
|
||||
|
||||
|property|description|default|required?|
|
||||
|
@ -927,9 +929,11 @@ Sample specs:
|
|||
|--------|-----------|-------|---------|
|
||||
|type|This should be `google`.|None|yes|
|
||||
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|
||||
Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified.
|
||||
|
||||
Google Cloud Storage object:
|
||||
|
||||
|property|description|default|required?|
|
||||
|
@ -1004,9 +1008,11 @@ Sample specs:
|
|||
|--------|-----------|-------|---------|
|
||||
|type|This should be `google`.|None|yes|
|
||||
|uris|JSON array of URIs where Azure Blob objects to be ingested are located. Should be in form "azure://\<container>/\<path-to-file\>"|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\<container>/\<prefix\>"|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\<container>/\<prefix\>". Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|objects|JSON array of Azure Blob objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|
||||
Note that the Azure input source will skip all empty objects only when `prefixes` is specified.
|
||||
|
||||
Azure Blob object:
|
||||
|
||||
|property|description|default|required?|
|
||||
|
@ -1092,7 +1098,7 @@ Sample specs:
|
|||
|property|description|default|required?|
|
||||
|--------|-----------|-------|---------|
|
||||
|type|This should be `hdfs`.|None|yes|
|
||||
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|None|yes|
|
||||
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
|
||||
|
||||
You can also ingest from cloud storage using the HDFS input source.
|
||||
However, if you want to read from AWS S3 or Google Cloud Storage, consider using
|
||||
|
@ -1233,8 +1239,8 @@ Sample spec:
|
|||
|--------|-----------|---------|
|
||||
|type|This should be "local".|yes|
|
||||
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes if `baseDir` is specified|
|
||||
|baseDir|Directory to search recursively for files to be ingested. |At least one of `baseDir` or `files` should be specified|
|
||||
|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. |At least one of `baseDir` or `files` should be specified|
|
||||
|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified|
|
||||
|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified|
|
||||
|
||||
### Druid Input Source
|
||||
|
||||
|
@ -1529,7 +1535,7 @@ This firehose will accept any type of parser, but will only utilize the list of
|
|||
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|
||||
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|
||||
|filter| See [Filters](../querying/filters.md)|no|
|
||||
|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
|
||||
|maxInputSegmentBytesPerTask|Deprecated. Use [Segments Split Hint Spec](#segments-split-hint-spec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
|
||||
|
||||
<a name="sql-firehose"></a>
|
||||
|
||||
|
|
|
@ -150,7 +150,7 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
|
|||
while (blobItemIterator.hasNext()) {
|
||||
ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next());
|
||||
/* skip directory objects */
|
||||
if (blobItem.isCloudBlob()) {
|
||||
if (blobItem.isCloudBlob() && blobItem.getCloudBlob().getBlobLength() > 0) {
|
||||
currentBlobItem = blobItem.getCloudBlob();
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
|||
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
|
||||
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
|
||||
.andReturn(CLOUD_OBJECT_LOCATION_1);
|
||||
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L);
|
||||
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
|
||||
replayAll();
|
||||
|
||||
azureInputSource = new AzureInputSource(
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.microsoft.azure.storage.ResultContinuation;
|
||||
import com.microsoft.azure.storage.ResultSegment;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
|
@ -127,10 +128,12 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
|
|||
blobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItem.class);
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class);
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs1 = createMock(CloudBlobHolder.class);
|
||||
EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs1.getBlobLength()).andReturn(10L).anyTimes();
|
||||
|
||||
blobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItem.class);
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItemHolder.class);
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs2 = createMock(CloudBlobHolder.class);
|
||||
EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs2.getBlobLength()).andReturn(10L).anyTimes();
|
||||
|
||||
blobItemPrefixWithCloudBlobsAndDirectories1 = createMock(ListBlobItem.class);
|
||||
directoryItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
|
||||
|
@ -138,6 +141,7 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
|
|||
blobItemPrefixWithCloudBlobsAndDirectories2 = createMock(ListBlobItem.class);
|
||||
cloudBlobItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
|
||||
cloudBlobDruidPrefixWithCloudBlobsAndDirectories = createMock(CloudBlobHolder.class);
|
||||
EasyMock.expect(cloudBlobDruidPrefixWithCloudBlobsAndDirectories.getBlobLength()).andReturn(10L).anyTimes();
|
||||
|
||||
blobItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItem.class);
|
||||
directoryItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItemHolder.class);
|
||||
|
@ -163,13 +167,13 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
|
|||
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn(
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs1);
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes();
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn(
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs1);
|
||||
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.getCloudBlob()).andReturn(
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs2);
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs2).anyTimes();
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs2)).andReturn(
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs2);
|
||||
|
||||
|
@ -179,7 +183,7 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
|
|||
|
||||
EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.getCloudBlob()).andReturn(
|
||||
cloudBlobDruidPrefixWithCloudBlobsAndDirectories);
|
||||
cloudBlobDruidPrefixWithCloudBlobsAndDirectories).anyTimes();
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories2)).andReturn(
|
||||
cloudBlobItemPrefixWithCloudBlobsAndDirectories);
|
||||
|
||||
|
@ -273,6 +277,57 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
|
|||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_next_emptyObjects_skipEmptyObjects() throws URISyntaxException, StorageException
|
||||
{
|
||||
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn(
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes();
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn(
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs1);
|
||||
|
||||
ListBlobItem emptyBlobItem = createMock(ListBlobItem.class);
|
||||
ListBlobItemHolder emptyBlobItemHolder = createMock(ListBlobItemHolder.class);
|
||||
CloudBlobHolder emptyBlobHolder = createMock(CloudBlobHolder.class);
|
||||
EasyMock.expect(emptyBlobHolder.getBlobLength()).andReturn(0L).anyTimes();
|
||||
EasyMock.expect(emptyBlobItemHolder.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(emptyBlobItemHolder.getCloudBlob()).andReturn(emptyBlobHolder).anyTimes();
|
||||
|
||||
EasyMock.expect(blobItemDruidFactory.create(emptyBlobItem)).andReturn(emptyBlobItemHolder);
|
||||
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
CONTAINER1,
|
||||
PREFIX_ONLY_CLOUD_BLOBS,
|
||||
nullResultContinuationToken,
|
||||
MAX_LISTING_LENGTH
|
||||
)).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1);
|
||||
|
||||
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken())
|
||||
.andReturn(nullResultContinuationToken);
|
||||
ArrayList<ListBlobItem> resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>();
|
||||
resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1);
|
||||
resultBlobItemsPrefixWithOnlyCloudBlobs1.add(emptyBlobItem);
|
||||
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults())
|
||||
.andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1);
|
||||
|
||||
replayAll();
|
||||
|
||||
azureCloudBlobIterator = new AzureCloudBlobIterator(
|
||||
storage,
|
||||
blobItemDruidFactory,
|
||||
config,
|
||||
ImmutableList.of(PREFIX_ONLY_CLOUD_BLOBS_URI),
|
||||
MAX_LISTING_LENGTH
|
||||
);
|
||||
|
||||
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(cloudBlobDruidPrefixWithOnlyCloudBlobs1);
|
||||
List<CloudBlobHolder> actualBlobItems = Lists.newArrayList(azureCloudBlobIterator);
|
||||
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
|
||||
Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems));
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = NoSuchElementException.class)
|
||||
public void test_next_emptyPrefixes_throwsNoSuchElementException()
|
||||
{
|
||||
|
|
|
@ -47,7 +47,7 @@ import java.util.stream.Stream;
|
|||
|
||||
public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
||||
{
|
||||
static final String SCHEME = "gs";
|
||||
public static final String SCHEME = "gs";
|
||||
|
||||
private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class);
|
||||
|
||||
|
@ -117,7 +117,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
|||
|
||||
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
|
||||
{
|
||||
return new CloudObjectLocation(storageObject.getBucket(), storageObject.getName());
|
||||
return GoogleUtils.objectToCloudObjectLocation(storageObject);
|
||||
}
|
||||
|
||||
private Iterable<StorageObject> storageObjectIterable()
|
||||
|
|
|
@ -20,18 +20,15 @@
|
|||
package org.apache.druid.storage.google;
|
||||
|
||||
import com.google.api.client.http.HttpResponseException;
|
||||
import com.google.api.services.storage.Storage;
|
||||
import com.google.api.services.storage.model.Objects;
|
||||
import com.google.api.services.storage.model.StorageObject;
|
||||
import com.google.common.base.Predicate;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.java.util.common.RetryUtils;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
public class GoogleUtils
|
||||
{
|
||||
|
@ -46,11 +43,20 @@ public class GoogleUtils
|
|||
return t instanceof IOException;
|
||||
}
|
||||
|
||||
private static <T> T retryGoogleCloudStorageOperation(RetryUtils.Task<T> f) throws Exception
|
||||
static <T> T retryGoogleCloudStorageOperation(RetryUtils.Task<T> f) throws Exception
|
||||
{
|
||||
return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES);
|
||||
}
|
||||
|
||||
public static URI objectToUri(StorageObject object)
|
||||
{
|
||||
return objectToCloudObjectLocation(object).toUri(GoogleCloudStorageInputSource.SCHEME);
|
||||
}
|
||||
|
||||
public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object)
|
||||
{
|
||||
return new CloudObjectLocation(object.getBucket(), object.getName());
|
||||
}
|
||||
|
||||
public static Iterator<StorageObject> lazyFetchingStorageObjectsIterator(
|
||||
final GoogleStorage storage,
|
||||
|
@ -58,91 +64,6 @@ public class GoogleUtils
|
|||
final long maxListingLength
|
||||
)
|
||||
{
|
||||
return new Iterator<StorageObject>()
|
||||
{
|
||||
private Storage.Objects.List listRequest;
|
||||
private Objects results;
|
||||
private URI currentUri;
|
||||
private String currentBucket;
|
||||
private String currentPrefix;
|
||||
private String nextPageToken;
|
||||
private Iterator<StorageObject> storageObjectsIterator;
|
||||
|
||||
{
|
||||
nextPageToken = null;
|
||||
prepareNextRequest();
|
||||
fetchNextBatch();
|
||||
}
|
||||
|
||||
private void prepareNextRequest()
|
||||
{
|
||||
try {
|
||||
currentUri = uris.next();
|
||||
currentBucket = currentUri.getAuthority();
|
||||
currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath());
|
||||
nextPageToken = null;
|
||||
listRequest = storage.list(currentBucket)
|
||||
.setPrefix(currentPrefix)
|
||||
.setMaxResults(maxListingLength);
|
||||
|
||||
}
|
||||
catch (IOException io) {
|
||||
throw new RuntimeException(io);
|
||||
}
|
||||
}
|
||||
|
||||
private void fetchNextBatch()
|
||||
{
|
||||
try {
|
||||
listRequest.setPageToken(nextPageToken);
|
||||
results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute());
|
||||
storageObjectsIterator = results.getItems().iterator();
|
||||
nextPageToken = results.getNextPageToken();
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageObject next()
|
||||
{
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
while (storageObjectsIterator.hasNext()) {
|
||||
final StorageObject next = storageObjectsIterator.next();
|
||||
// list with prefix can return directories, but they should always end with `/`, ignore them
|
||||
if (!next.getName().endsWith("/")) {
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
if (nextPageToken != null) {
|
||||
fetchNextBatch();
|
||||
} else if (uris.hasNext()) {
|
||||
prepareNextRequest();
|
||||
fetchNextBatch();
|
||||
}
|
||||
|
||||
if (!storageObjectsIterator.hasNext()) {
|
||||
throw new ISE(
|
||||
"Failed to further iterate on bucket[%s] and prefix[%s]. The last page token was [%s]",
|
||||
currentBucket,
|
||||
currentPrefix,
|
||||
nextPageToken
|
||||
);
|
||||
}
|
||||
|
||||
return next();
|
||||
}
|
||||
};
|
||||
return new ObjectStorageIterator(storage, uris, maxListingLength);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.google;
|
||||
|
||||
import com.google.api.services.storage.Storage;
|
||||
import com.google.api.services.storage.model.Objects;
|
||||
import com.google.api.services.storage.model.StorageObject;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
public class ObjectStorageIterator implements Iterator<StorageObject>
|
||||
{
|
||||
private final GoogleStorage storage;
|
||||
private final Iterator<URI> uris;
|
||||
private final long maxListingLength;
|
||||
|
||||
private Storage.Objects.List listRequest;
|
||||
private Objects results;
|
||||
private URI currentUri;
|
||||
private String nextPageToken;
|
||||
private Iterator<StorageObject> storageObjectsIterator;
|
||||
private StorageObject currentObject;
|
||||
|
||||
public ObjectStorageIterator(GoogleStorage storage, Iterator<URI> uris, long maxListingLength)
|
||||
{
|
||||
this.storage = storage;
|
||||
this.uris = uris;
|
||||
this.maxListingLength = maxListingLength;
|
||||
this.nextPageToken = null;
|
||||
|
||||
prepareNextRequest();
|
||||
fetchNextBatch();
|
||||
advanceStorageObject();
|
||||
}
|
||||
|
||||
private void prepareNextRequest()
|
||||
{
|
||||
try {
|
||||
currentUri = uris.next();
|
||||
String currentBucket = currentUri.getAuthority();
|
||||
String currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath());
|
||||
nextPageToken = null;
|
||||
listRequest = storage.list(currentBucket)
|
||||
.setPrefix(currentPrefix)
|
||||
.setMaxResults(maxListingLength);
|
||||
|
||||
}
|
||||
catch (IOException io) {
|
||||
throw new RuntimeException(io);
|
||||
}
|
||||
}
|
||||
|
||||
private void fetchNextBatch()
|
||||
{
|
||||
try {
|
||||
listRequest.setPageToken(nextPageToken);
|
||||
results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute());
|
||||
storageObjectsIterator = results.getItems().iterator();
|
||||
nextPageToken = results.getNextPageToken();
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return currentObject != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageObject next()
|
||||
{
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
final StorageObject retVal = currentObject;
|
||||
advanceStorageObject();
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private void advanceStorageObject()
|
||||
{
|
||||
while (storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext()) {
|
||||
while (storageObjectsIterator.hasNext()) {
|
||||
final StorageObject next = storageObjectsIterator.next();
|
||||
// list with prefix can return directories, but they should always end with `/`, ignore them.
|
||||
// also skips empty objects.
|
||||
if (!next.getName().endsWith("/") && next.getSize().signum() > 0) {
|
||||
currentObject = next;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (nextPageToken != null) {
|
||||
fetchNextBatch();
|
||||
} else if (uris.hasNext()) {
|
||||
prepareNextRequest();
|
||||
fetchNextBatch();
|
||||
}
|
||||
}
|
||||
|
||||
// Truly nothing left to read.
|
||||
currentObject = null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,297 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.google;
|
||||
|
||||
import com.google.api.client.http.HttpRequestInitializer;
|
||||
import com.google.api.client.http.HttpTransport;
|
||||
import com.google.api.client.json.JsonFactory;
|
||||
import com.google.api.services.storage.Storage;
|
||||
import com.google.api.services.storage.model.StorageObject;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.druid.storage.google.ObjectStorageIteratorTest.MockStorage.MockObjects.MockList;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ObjectStorageIteratorTest
|
||||
{
|
||||
private static final ImmutableList<StorageObject> TEST_OBJECTS =
|
||||
ImmutableList.of(
|
||||
makeStorageObject("b", "foo", 10L),
|
||||
makeStorageObject("b", "foo/", 0L), // directory
|
||||
makeStorageObject("b", "foo/bar1", 10L),
|
||||
makeStorageObject("b", "foo/bar2", 10L),
|
||||
makeStorageObject("b", "foo/bar3", 10L),
|
||||
makeStorageObject("b", "foo/bar4", 10L),
|
||||
makeStorageObject("b", "foo/bar5", 0L), // empty object
|
||||
makeStorageObject("b", "foo/baz", 10L),
|
||||
makeStorageObject("bucketnotmine", "a/different/bucket", 10L),
|
||||
makeStorageObject("b", "foo/bar/", 0L) // another directory at the end of list
|
||||
);
|
||||
|
||||
@Test
|
||||
public void testSingleObject()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of("gs://b/foo/baz"),
|
||||
ImmutableList.of("gs://b/foo/baz"),
|
||||
5
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiObjectOneKeyAtATime()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
|
||||
ImmutableList.of("gs://b/foo/"),
|
||||
1
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiObjectTwoKeysAtATime()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
|
||||
ImmutableList.of("gs://b/foo/"),
|
||||
2
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiObjectTenKeysAtATime()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
|
||||
ImmutableList.of("gs://b/foo/"),
|
||||
10
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefixInMiddleOfKey()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4"),
|
||||
ImmutableList.of("gs://b/foo/bar"),
|
||||
10
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoPath()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of(
|
||||
"gs://b/foo",
|
||||
"gs://b/foo/bar1",
|
||||
"gs://b/foo/bar2",
|
||||
"gs://b/foo/bar3",
|
||||
"gs://b/foo/bar4",
|
||||
"gs://b/foo/baz"
|
||||
),
|
||||
ImmutableList.of("gs://b"),
|
||||
10
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlashPath()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of(
|
||||
"gs://b/foo",
|
||||
"gs://b/foo/bar1",
|
||||
"gs://b/foo/bar2",
|
||||
"gs://b/foo/bar3",
|
||||
"gs://b/foo/bar4",
|
||||
"gs://b/foo/baz"
|
||||
),
|
||||
ImmutableList.of("gs://b/"),
|
||||
10
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentBucket()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of("gs://bx/foo/"),
|
||||
10
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
|
||||
ImmutableList.of("gs://b/foo/bar", "gs://b/foo/baz"),
|
||||
10
|
||||
);
|
||||
}
|
||||
|
||||
private static void test(
|
||||
final List<String> expectedUris,
|
||||
final List<String> prefixes,
|
||||
final int maxListingLength
|
||||
)
|
||||
{
|
||||
final List<StorageObject> expectedObjects = new ArrayList<>();
|
||||
|
||||
// O(N^2) but who cares -- the list is short.
|
||||
for (final String uri : expectedUris) {
|
||||
final List<StorageObject> matches = TEST_OBJECTS
|
||||
.stream()
|
||||
.filter(storageObject -> GoogleUtils.objectToUri(storageObject).toString().equals(uri))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
expectedObjects.add(Iterables.getOnlyElement(matches));
|
||||
}
|
||||
|
||||
final List<StorageObject> actualObjects = ImmutableList.copyOf(
|
||||
GoogleUtils.lazyFetchingStorageObjectsIterator(
|
||||
makeMockClient(TEST_OBJECTS),
|
||||
prefixes.stream().map(URI::create).iterator(),
|
||||
maxListingLength
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
prefixes.toString(),
|
||||
expectedObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList()),
|
||||
actualObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a mock Google Storage client that handles enough of "List" to test the functionality of the
|
||||
* {@link ObjectStorageIterator} class.
|
||||
*/
|
||||
private static GoogleStorage makeMockClient(final List<StorageObject> storageObjects)
|
||||
{
|
||||
return new GoogleStorage(null)
|
||||
{
|
||||
@Override
|
||||
public Storage.Objects.List list(final String bucket)
|
||||
{
|
||||
return mockList(bucket, storageObjects);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@SuppressWarnings("UnnecessaryFullyQualifiedName")
|
||||
static class MockStorage extends Storage
|
||||
{
|
||||
private MockStorage()
|
||||
{
|
||||
super(
|
||||
EasyMock.niceMock(HttpTransport.class),
|
||||
EasyMock.niceMock(JsonFactory.class),
|
||||
EasyMock.niceMock(HttpRequestInitializer.class)
|
||||
);
|
||||
}
|
||||
|
||||
private MockList mockList(String bucket, java.util.List<StorageObject> storageObjects)
|
||||
{
|
||||
return new MockObjects().mockList(bucket, storageObjects);
|
||||
}
|
||||
|
||||
class MockObjects extends Storage.Objects
|
||||
{
|
||||
private MockList mockList(String bucket, java.util.List<StorageObject> storageObjects)
|
||||
{
|
||||
return new MockList(bucket, storageObjects);
|
||||
}
|
||||
|
||||
class MockList extends Objects.List
|
||||
{
|
||||
private final java.util.List<StorageObject> storageObjects;
|
||||
|
||||
private MockList(String bucket, java.util.List<StorageObject> storageObjects)
|
||||
{
|
||||
super(bucket);
|
||||
this.storageObjects = storageObjects;
|
||||
}
|
||||
|
||||
@Override
|
||||
public com.google.api.services.storage.model.Objects execute()
|
||||
{
|
||||
// Continuation token is an index in the "objects" list.
|
||||
final String continuationToken = getPageToken();
|
||||
final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken);
|
||||
|
||||
// Find matching objects.
|
||||
java.util.List<StorageObject> objects = new ArrayList<>();
|
||||
int nextIndex = -1;
|
||||
|
||||
for (int i = startIndex; i < storageObjects.size(); i++) {
|
||||
final StorageObject storageObject = storageObjects.get(i);
|
||||
|
||||
if (storageObject.getBucket().equals(getBucket())
|
||||
&& storageObject.getName().startsWith(getPrefix())) {
|
||||
|
||||
if (objects.size() == getMaxResults()) {
|
||||
// We reached our max key limit; set nextIndex (which will lead to a result with truncated = true).
|
||||
nextIndex = i;
|
||||
break;
|
||||
}
|
||||
|
||||
// Generate a summary.
|
||||
objects.add(storageObject);
|
||||
}
|
||||
}
|
||||
|
||||
com.google.api.services.storage.model.Objects retVal = new com.google.api.services.storage.model.Objects();
|
||||
retVal.setItems(objects);
|
||||
if (nextIndex >= 0) {
|
||||
retVal.setNextPageToken(String.valueOf(nextIndex));
|
||||
} else {
|
||||
retVal.setNextPageToken(null);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static MockList mockList(String bucket, List<StorageObject> storageObjects)
|
||||
{
|
||||
return new MockStorage().mockList(bucket, storageObjects);
|
||||
}
|
||||
|
||||
private static StorageObject makeStorageObject(final String bucket, final String key, final long size)
|
||||
{
|
||||
final StorageObject summary = new StorageObject();
|
||||
summary.setBucket(bucket);
|
||||
summary.setName(key);
|
||||
summary.setSize(BigInteger.valueOf(size));
|
||||
return summary;
|
||||
}
|
||||
}
|
|
@ -116,6 +116,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
|
|||
|
||||
return new HdfsFileInputFormat().getSplits(job)
|
||||
.stream()
|
||||
.filter(split -> ((FileSplit) split).getLength() > 0)
|
||||
.map(split -> ((FileSplit) split).getPath())
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
|
|
@ -125,8 +125,8 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
|
|||
while (objectSummaryIterator.hasNext() || result.isTruncated() || prefixesIterator.hasNext()) {
|
||||
while (objectSummaryIterator.hasNext()) {
|
||||
currentObjectSummary = objectSummaryIterator.next();
|
||||
|
||||
if (!isDirectoryPlaceholder(currentObjectSummary)) {
|
||||
// skips directories and empty objects
|
||||
if (!isDirectoryPlaceholder(currentObjectSummary) && currentObjectSummary.getSize() > 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -374,8 +374,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
public void testWithPrefixesSplit()
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
|
||||
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
|
||||
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
||||
S3InputSource inputSource = new S3InputSource(
|
||||
|
@ -401,8 +401,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
public void testCreateSplitsWithSplitHintSpecRespectingHint()
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
|
||||
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
|
||||
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
||||
S3InputSource inputSource = new S3InputSource(
|
||||
|
@ -412,7 +412,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
PREFIXES,
|
||||
null,
|
||||
null);
|
||||
null
|
||||
);
|
||||
|
||||
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
|
||||
|
@ -426,11 +427,40 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
EasyMock.verify(S3_CLIENT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects()
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
|
||||
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), new byte[0]);
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
||||
S3InputSource inputSource = new S3InputSource(
|
||||
SERVICE,
|
||||
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
|
||||
INPUT_DATA_CONFIG,
|
||||
null,
|
||||
PREFIXES,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(ImmutableList.of(new CloudObjectLocation(EXPECTED_URIS.get(0)))),
|
||||
splits.map(InputSplit::get).collect(Collectors.toList())
|
||||
);
|
||||
EasyMock.verify(S3_CLIENT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccessDeniedWhileListingPrefix()
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
|
||||
expectListObjectsAndThrowAccessDenied(EXPECTED_URIS.get(1));
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
||||
|
@ -459,8 +489,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
public void testReader() throws IOException
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
|
||||
expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
|
||||
expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
|
||||
expectGetObject(EXPECTED_URIS.get(0));
|
||||
expectGetObject(EXPECTED_URIS.get(1));
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
@ -503,8 +533,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
public void testCompressedReader() throws IOException
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
|
||||
expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)));
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)), CONTENT);
|
||||
expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)), CONTENT);
|
||||
expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(0));
|
||||
expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(1));
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
@ -543,7 +573,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
EasyMock.verify(S3_CLIENT);
|
||||
}
|
||||
|
||||
private static void expectListObjects(URI prefix, List<URI> uris)
|
||||
private static void expectListObjects(URI prefix, List<URI> uris, byte[] content)
|
||||
{
|
||||
final ListObjectsV2Result result = new ListObjectsV2Result();
|
||||
result.setBucketName(prefix.getAuthority());
|
||||
|
@ -554,7 +584,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
final S3ObjectSummary objectSummary = new S3ObjectSummary();
|
||||
objectSummary.setBucketName(bucket);
|
||||
objectSummary.setKey(key);
|
||||
objectSummary.setSize(CONTENT.length);
|
||||
objectSummary.setSize(content.length);
|
||||
result.getObjectSummaries().add(objectSummary);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,13 +37,15 @@ public class ObjectSummaryIteratorTest
|
|||
private static final ImmutableList<S3ObjectSummary> TEST_OBJECTS =
|
||||
ImmutableList.of(
|
||||
makeObjectSummary("b", "foo", 10L),
|
||||
makeObjectSummary("b", "foo/", 0L),
|
||||
makeObjectSummary("b", "foo/", 0L), // directory
|
||||
makeObjectSummary("b", "foo/bar1", 10L),
|
||||
makeObjectSummary("b", "foo/bar2", 10L),
|
||||
makeObjectSummary("b", "foo/bar3", 10L),
|
||||
makeObjectSummary("b", "foo/bar4", 10L),
|
||||
makeObjectSummary("b", "foo/bar5", 0L), // empty object
|
||||
makeObjectSummary("b", "foo/baz", 10L),
|
||||
makeObjectSummary("bucketnotmine", "a/different/bucket", 10L)
|
||||
makeObjectSummary("bucketnotmine", "a/different/bucket", 10L),
|
||||
makeObjectSummary("b", "foo/bar/", 0L) // another directory at the end of list
|
||||
);
|
||||
|
||||
@Test
|
||||
|
@ -140,6 +142,16 @@ public class ObjectSummaryIteratorTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes()
|
||||
{
|
||||
test(
|
||||
ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
|
||||
ImmutableList.of("s3://b/foo/bar", "s3://b/foo/baz"),
|
||||
10
|
||||
);
|
||||
}
|
||||
|
||||
private static void test(
|
||||
final List<String> expectedUris,
|
||||
final List<String> prefixes,
|
||||
|
|
|
@ -46,10 +46,12 @@ public class S3TimestampVersionedDataFinderTest
|
|||
object0.setBucketName(bucket);
|
||||
object0.setKey(keyPrefix + "/renames-0.gz");
|
||||
object0.setLastModified(new Date(0));
|
||||
object0.setSize(10);
|
||||
|
||||
object1.setBucketName(bucket);
|
||||
object1.setKey(keyPrefix + "/renames-1.gz");
|
||||
object1.setLastModified(new Date(1));
|
||||
object1.setSize(10);
|
||||
|
||||
final ListObjectsV2Result result = new ListObjectsV2Result();
|
||||
result.getObjectSummaries().add(object0);
|
||||
|
@ -116,6 +118,7 @@ public class S3TimestampVersionedDataFinderTest
|
|||
object0.setBucketName(bucket);
|
||||
object0.setKey(keyPrefix + "/renames-0.gz");
|
||||
object0.setLastModified(new Date(0));
|
||||
object0.setSize(10);
|
||||
|
||||
final ListObjectsV2Result result = new ListObjectsV2Result();
|
||||
result.getObjectSummaries().add(object0);
|
||||
|
@ -153,6 +156,7 @@ public class S3TimestampVersionedDataFinderTest
|
|||
object0.setBucketName(bucket);
|
||||
object0.setKey(keyPrefix + "/renames-0.gz");
|
||||
object0.setLastModified(new Date(0));
|
||||
object0.setSize(10);
|
||||
|
||||
final ListObjectsV2Result result = new ListObjectsV2Result();
|
||||
result.getObjectSummaries().add(object0);
|
||||
|
|
Loading…
Reference in New Issue