From b6ad790dc7f49762ef0531219838dc0ce2e4a183 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Tue, 15 Sep 2020 18:25:35 -0500 Subject: [PATCH] Support combining inputsource for parallel ingestion (#10387) * Add combining inputsource * Fix documentation Co-authored-by: Atul Mohan --- .../apache/druid/data/input/InputSource.java | 4 +- .../data/input/impl/CombiningInputSource.java | 132 ++++++++ .../input/impl/CombiningInputSourceTest.java | 313 ++++++++++++++++++ docs/ingestion/native-batch.md | 42 +++ .../tutorial/updates-append-index.json | 85 ++--- ...CombiningInputSourceParallelIndexTest.java | 120 +++++++ ...ning_input_source_index_parallel_task.json | 98 ++++++ 7 files changed, 752 insertions(+), 42 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_combining_input_source_index_parallel_task.json diff --git a/core/src/main/java/org/apache/druid/data/input/InputSource.java b/core/src/main/java/org/apache/druid/data/input/InputSource.java index b0144c51eef..ba0224e8b0b 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/InputSource.java @@ -22,6 +22,7 @@ package org.apache.druid.data.input; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.impl.CombiningInputSource; import org.apache.druid.data.input.impl.HttpInputSource; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.LocalInputSource; @@ -50,7 +51,8 @@ import java.io.File; @JsonSubTypes(value = { @Type(name = "local", value = LocalInputSource.class), @Type(name = "http", value = HttpInputSource.class), - @Type(name = "inline", value = InlineInputSource.class) + @Type(name = "inline", value = InlineInputSource.class), + @Type(name = "combining", value = CombiningInputSource.class) }) public interface InputSource { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java new file mode 100644 index 00000000000..0b8201e312c --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.java.util.common.Pair; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +/** + * InputSource that combines data from multiple inputSources. The delegate inputSources must be splittable. + * The splits for this inputSource are created from the {@link SplittableInputSource#createSplits} of the delegate inputSources. + * Each inputSplit is paired up with its respective delegate inputSource so that during split, + * {@link SplittableInputSource#withSplit}is called against the correct inputSource for each inputSplit. + * This inputSource presently only supports a single {@link InputFormat}. + */ + +public class CombiningInputSource extends AbstractInputSource implements SplittableInputSource +{ + private final List delegates; + + @JsonCreator + public CombiningInputSource( + @JsonProperty("delegates") List delegates + ) + { + Preconditions.checkArgument( + delegates != null && !delegates.isEmpty(), + "Must specify atleast one delegate inputSource" + ); + this.delegates = delegates; + } + + @JsonProperty + public List getDelegates() + { + return delegates; + } + + @Override + public Stream createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) + { + return delegates.stream().flatMap(inputSource -> { + try { + return inputSource.createSplits(inputFormat, splitHintSpec) + .map(inputsplit -> new InputSplit(Pair.of(inputSource, inputsplit))); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return delegates.stream().mapToInt(inputSource -> { + try { + return inputSource.estimateNumSplits(inputFormat, splitHintSpec); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }).sum(); + } + + @Override + public InputSource withSplit(InputSplit split) + { + Pair inputSourceWithSplit = (Pair) split.get(); + return inputSourceWithSplit.lhs.withSplit(inputSourceWithSplit.rhs); + } + + @Override + public boolean needsFormat() + { + // This is called only when ParallelIndexIngestionSpec needs to decide if either inputformat vs parserspec is required. + // So if at least one of the delegate inputSources needsFormat, we set this to true. + // All other needsFormat calls will be made against the delegate inputSources. + return delegates.stream().anyMatch(SplittableInputSource::needsFormat); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CombiningInputSource that = (CombiningInputSource) o; + return delegates.equals(that.delegates); + } + + @Override + public int hashCode() + { + return Objects.hash(delegates); + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java new file mode 100644 index 00000000000..1db194baec1 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.Pair; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class CombiningInputSourceTest +{ + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new SimpleModule("test-module").registerSubtypes(TestFileInputSource.class, TestUriInputSource.class)); + final TestFileInputSource fileSource = new TestFileInputSource(ImmutableList.of(new File("myFile").getAbsoluteFile())); + final TestUriInputSource uriInputSource = new TestUriInputSource( + ImmutableList.of(URI.create("http://test.com/http-test"))); + final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of( + fileSource, + uriInputSource + )); + final byte[] json = mapper.writeValueAsBytes(combiningInputSource); + final CombiningInputSource fromJson = (CombiningInputSource) mapper.readValue(json, InputSource.class); + Assert.assertEquals(combiningInputSource, fromJson); + } + + @Test + public void testEstimateNumSplits() + { + final File file = EasyMock.niceMock(File.class); + EasyMock.expect(file.length()).andReturn(5L).anyTimes(); + EasyMock.replay(file); + final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3)); + final TestUriInputSource uriInputSource = new TestUriInputSource( + ImmutableList.of( + URI.create("http://test.com/http-test1"), + URI.create("http://test.com/http-test2"), + URI.create("http://test.com/http-test3") + ) + ); + final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of( + fileSource, + uriInputSource + )); + Assert.assertEquals(combiningInputSource.estimateNumSplits( + new NoopInputFormat(), + new MaxSizeSplitHintSpec( + new HumanReadableBytes(5L), + null + ) + ), 6); + } + + + @Test + public void testCreateSplits() + { + final File file = EasyMock.niceMock(File.class); + EasyMock.expect(file.length()).andReturn(30L).anyTimes(); + EasyMock.replay(file); + final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3)); + final TestUriInputSource uriInputSource = new TestUriInputSource( + ImmutableList.of( + URI.create("http://test.com/http-test3"), + URI.create("http://test.com/http-test4"), + URI.create("http://test.com/http-test5") + ) + ); + final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of( + fileSource, + uriInputSource + )); + List combinedInputSplits = combiningInputSource.createSplits( + new NoopInputFormat(), + new MaxSizeSplitHintSpec( + new HumanReadableBytes(5L), + null + ) + ).collect(Collectors.toList()); + Assert.assertEquals(6, combinedInputSplits.size()); + for (int i = 0; i < 3; i++) { + Pair splitPair = (Pair) combinedInputSplits.get(i).get(); + InputSplit fileSplits = splitPair.rhs; + Assert.assertTrue(splitPair.lhs instanceof TestFileInputSource); + Assert.assertEquals(5, fileSplits.get().length()); + } + for (int i = 3; i < combinedInputSplits.size(); i++) { + Pair splitPair = (Pair) combinedInputSplits.get(i).get(); + InputSplit fileSplits = splitPair.rhs; + Assert.assertTrue(splitPair.lhs instanceof TestUriInputSource); + Assert.assertEquals(URI.create("http://test.com/http-test" + i), fileSplits.get()); + } + } + + @Test + public void testWithSplits() + { + final TestUriInputSource uriInputSource = new TestUriInputSource( + ImmutableList.of( + URI.create("http://test.com/http-test1")) + ); + final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of( + uriInputSource + )); + InputSplit testUriSplit = new InputSplit<>(URI.create("http://test.com/http-test1")); + TestUriInputSource urlInputSourceWithSplit = (TestUriInputSource) combiningInputSource.withSplit(new InputSplit(Pair.of( + uriInputSource, + testUriSplit))); + Assert.assertEquals(uriInputSource, urlInputSourceWithSplit); + + } + + @Test + public void testNeedsFormat() + { + final TestUriInputSource uriInputSource = new TestUriInputSource( + ImmutableList.of( + URI.create("http://test.com/http-test1") + ) + ); + final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3)); + + final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of( + uriInputSource, + fileSource + )); + Assert.assertTrue(combiningInputSource.needsFormat()); + + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(CombiningInputSource.class) + .withNonnullFields("delegates") + .usingGetClass() + .verify(); + } + + private static List generateFiles(int numFiles) + { + final List files = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final File file = EasyMock.niceMock(File.class); + EasyMock.expect(file.length()).andReturn(5L).anyTimes(); + EasyMock.replay(file); + files.add(file); + } + return files; + } + + private static class TestFileInputSource extends AbstractInputSource implements SplittableInputSource + { + private final List files; + + @JsonCreator + private TestFileInputSource(@JsonProperty("files") List fileList) + { + files = fileList; + } + + @JsonProperty + public List getFiles() + { + return files; + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return files.stream().map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return files.size(); + } + + @Override + public SplittableInputSource withSplit(InputSplit split) + { + return new TestFileInputSource(ImmutableList.of(split.get())); + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestFileInputSource that = (TestFileInputSource) o; + return Objects.equals(files, that.files); + } + + @Override + public int hashCode() + { + return Objects.hash(files); + } + } + + private static class TestUriInputSource extends AbstractInputSource implements SplittableInputSource + { + private final List uris; + + @JsonCreator + private TestUriInputSource(@JsonProperty("uris") List uriList) + { + uris = uriList; + } + + @JsonProperty + public List getUris() + { + return uris; + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return uris.stream().map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return uris.size(); + } + + @Override + public SplittableInputSource withSplit(InputSplit split) + { + return new TestUriInputSource(ImmutableList.of(split.get())); + } + + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestUriInputSource that = (TestUriInputSource) o; + return Objects.equals(uris, that.uris); + } + + @Override + public int hashCode() + { + return Objects.hash(uris); + } + } +} diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 7181efcd62b..9b718250b70 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1368,6 +1368,48 @@ Compared to the other native batch InputSources, SQL InputSource behaves differe * Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`. +### Combining Input Source + +The Combining input source is used to read data from multiple InputSources. This input source should be only used if all the delegate input sources are + _splittable_ and can be used by the [Parallel task](#parallel-task). This input source will identify the splits from its delegates and each split will be processed by a worker task. Similar to other input sources, this input source supports a single `inputFormat`. Therefore, please note that delegate input sources requiring an `inputFormat` must have the same format for input data. + +|property|description|required?| +|--------|-----------|---------| +|type|This should be "combining".|Yes| +|delegates|List of _splittable_ InputSources to read data from.|Yes| + +Sample spec: + + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "combining", + "delegates" : [ + { + "type": "local", + "filter" : "*.csv", + "baseDir": "/data/directory", + "files": ["/bar/foo", "/foo/bar"] + }, + { + "type": "druid", + "dataSource": "wikipedia", + "interval": "2013-01-01/2013-01-02" + } + ] + }, + "inputFormat": { + "type": "csv" + }, + ... + }, +... +``` + + ### ## Firehoses (Deprecated) diff --git a/examples/quickstart/tutorial/updates-append-index.json b/examples/quickstart/tutorial/updates-append-index.json index 75d23356efd..9ba53a0b311 100644 --- a/examples/quickstart/tutorial/updates-append-index.json +++ b/examples/quickstart/tutorial/updates-append-index.json @@ -1,58 +1,61 @@ { - "type" : "index", - "spec" : { - "dataSchema" : { - "dataSource" : "updates-tutorial", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "dimensionsSpec" : { - "dimensions" : [ - "animal" - ] - }, - "timestampSpec": { - "column": "timestamp", - "format": "iso" - } - } + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "updates-tutorial", + "dimensionsSpec": { + "dimensions": [ + "animal" + ] }, - "metricsSpec" : [ - { "type" : "count", "name" : "count" }, - { "type" : "longSum", "name" : "number", "fieldName" : "number" } + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "metricsSpec": [ + { "type": "count", "name": "count"}, + { "type": "longSum", "name": "number", "fieldName": "number"} ], - "granularitySpec" : { - "type" : "uniform", - "segmentGranularity" : "week", - "queryGranularity" : "minute", - "intervals" : ["2018-01-01/2018-01-03"], - "rollup" : true + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "week", + "queryGranularity": "minute", + "intervals": ["2018-01-01/2018-01-03"], + "rollup": true } }, - "ioConfig" : { - "type" : "index", - "firehose" : { + "ioConfig": { + "type": "index_parallel", + "inputSource": { "type": "combining", "delegates": [ { - "type" : "ingestSegment", - "dataSource" : "updates-tutorial", - "interval" : "2018-01-01/2018-01-03" + "type": "druid", + "dataSource": "updates-tutorial", + "interval": "2018-01-01/2018-01-03" }, { - "type" : "local", - "baseDir" : "quickstart/tutorial", - "filter" : "updates-data3.json" + "type": "local", + "baseDir": "quickstart/tutorial", + "filter": "updates-data3.json" } ] }, - "appendToExisting" : false + "inputFormat": { + "type": "json" + }, + "appendToExisting": false }, - "tuningConfig" : { - "type" : "index", - "maxRowsPerSegment" : 5000000, - "maxRowsInMemory" : 25000 + "tuningConfig": { + "type": "index_parallel", + "maxRowsPerSegment": 5000000, + "maxRowsInMemory": 25000, + "maxNumConcurrentSubTasks": 2, + "forceGuaranteedRollup": true, + "partitionsSpec": { + "type": "hashed", + "numShards": 1 + } } } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java new file mode 100644 index 00000000000..dd46d9b127b --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.util.Map; +import java.util.function.Function; + +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE}) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITCombiningInputSourceParallelIndexTest extends AbstractITBatchIndexTest +{ + private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INDEX_DATASOURCE = "wikipedia_index_test"; + + private static final String COMBINING_INDEX_TASK = "/indexer/wikipedia_combining_input_source_index_parallel_task.json"; + private static final String COMBINING_QUERIES_RESOURCE = "/indexer/wikipedia_combining_firehose_index_queries.json"; + private static final String COMBINING_INDEX_DATASOURCE = "wikipedia_comb_index_test"; + + @Test + public void testIndexData() throws Exception + { + Map inputFormatMap = new ImmutableMap.Builder().put("type", "json") + .build(); + try ( + final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ignored2 = unloader(COMBINING_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + ) { + final Function combiningInputSourceSpecTransform = spec -> { + try { + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null)) + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_FILTER%%", + "wikipedia_index_data*" + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_BASE_DIR%%", + "/resources/data/batch_index/json" + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + jsonMapper.writeValueAsString(inputFormatMap) + ); + spec = StringUtils.replace( + spec, + "%%APPEND_TO_EXISTING%%", + jsonMapper.writeValueAsString(false) + ); + spec = StringUtils.replace( + spec, + "%%FORCE_GUARANTEED_ROLLUP%%", + jsonMapper.writeValueAsString(false) + ); + spec = StringUtils.replace( + spec, + "%%COMBINING_DATASOURCE%%", + INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + ); + return spec; + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + INDEX_DATASOURCE, + INDEX_TASK, + combiningInputSourceSpecTransform, + INDEX_QUERIES_RESOURCE, + false, + true, + true + ); + doIndexTest( + COMBINING_INDEX_DATASOURCE, + COMBINING_INDEX_TASK, + combiningInputSourceSpecTransform, + COMBINING_QUERIES_RESOURCE, + false, + true, + true + ); + } + } + +} diff --git a/integration-tests/src/test/resources/indexer/wikipedia_combining_input_source_index_parallel_task.json b/integration-tests/src/test/resources/indexer/wikipedia_combining_input_source_index_parallel_task.json new file mode 100644 index 00000000000..8e1d0941bf0 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_combining_input_source_index_parallel_task.json @@ -0,0 +1,98 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "timestampSpec": { + "column": "timestamp" + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals" : [ "2013-08-31/2013-09-02" ] + }, + "dimensionsSpec": { + "dimensions": [ + "page", + {"type": "string", "name": "language", "createBitmapIndex": false}, + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "combining", + "delegates": [ + { + "type": "local", + "baseDir": "/resources/indexer", + "filter": "wikipedia_combining_index_data.json" + }, + { + "type": "druid", + "dataSource": "%%COMBINING_DATASOURCE%%", + "interval": "2013-08-31/2013-09-02" + } + ] + }, + "appendToExisting": %%APPEND_TO_EXISTING%%, + "inputFormat": %%INPUT_FORMAT%% + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumConcurrentSubTasks": 4, + "splitHintSpec": { + "type": "maxSize", + "maxNumFiles": 1 + }, + "forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%, + "partitionsSpec": %%PARTITIONS_SPEC%% + } + } +}