mirror of https://github.com/apache/druid.git
Add integration tests for all InputFormat (#10088)
* Add integration tests for Avro OCF InputFormat * Add integration tests for Avro OCF InputFormat * add tests * fix bug * fix bug * fix failing tests * add comments * address comments * address comments * address comments * fix test data * reduce resource needed for IT * remove bug fix * fix checkstyle * add bug fix
This commit is contained in:
parent
859ff6e9c0
commit
4e8570b71b
17
.travis.yml
17
.travis.yml
|
@ -313,6 +313,14 @@ jobs:
|
||||||
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
|
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
|
||||||
done
|
done
|
||||||
|
|
||||||
|
- &integration_input_format
|
||||||
|
name: "(Compile=openjdk8, Run=openjdk8) input format integration test"
|
||||||
|
jdk: openjdk8
|
||||||
|
services: *integration_test_services
|
||||||
|
env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=8'
|
||||||
|
script: *run_integration_test
|
||||||
|
after_failure: *integration_test_diags
|
||||||
|
|
||||||
- &integration_perfect_rollup_parallel_batch_index
|
- &integration_perfect_rollup_parallel_batch_index
|
||||||
name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test"
|
name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test"
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
|
@ -389,7 +397,7 @@ jobs:
|
||||||
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
|
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
services: *integration_test_services
|
services: *integration_test_services
|
||||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=8'
|
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
|
||||||
script: *run_integration_test
|
script: *run_integration_test
|
||||||
after_failure: *integration_test_diags
|
after_failure: *integration_test_diags
|
||||||
# END - Integration tests for Compile with Java 8 and Run with Java 8
|
# END - Integration tests for Compile with Java 8 and Run with Java 8
|
||||||
|
@ -400,6 +408,11 @@ jobs:
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=11'
|
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=11'
|
||||||
|
|
||||||
|
- <<: *integration_input_format
|
||||||
|
name: "(Compile=openjdk8, Run=openjdk11) input format integration test"
|
||||||
|
jdk: openjdk8
|
||||||
|
env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=11'
|
||||||
|
|
||||||
- <<: *integration_perfect_rollup_parallel_batch_index
|
- <<: *integration_perfect_rollup_parallel_batch_index
|
||||||
name: "(Compile=openjdk8, Run=openjdk11) perfect rollup parallel batch index integration test"
|
name: "(Compile=openjdk8, Run=openjdk11) perfect rollup parallel batch index integration test"
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
|
@ -423,7 +436,7 @@ jobs:
|
||||||
- <<: *integration_tests
|
- <<: *integration_tests
|
||||||
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
|
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
|
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
|
||||||
# END - Integration tests for Compile with Java 8 and Run with Java 11
|
# END - Integration tests for Compile with Java 8 and Run with Java 11
|
||||||
|
|
||||||
- name: "security vulnerabilities"
|
- name: "security vulnerabilities"
|
||||||
|
|
|
@ -19,11 +19,11 @@
|
||||||
|
|
||||||
package org.apache.druid.query.aggregation.datasketches.quantiles;
|
package org.apache.druid.query.aggregation.datasketches.quantiles;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Doubles;
|
||||||
import org.apache.datasketches.memory.Memory;
|
import org.apache.datasketches.memory.Memory;
|
||||||
import org.apache.datasketches.quantiles.DoublesSketch;
|
import org.apache.datasketches.quantiles.DoublesSketch;
|
||||||
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
|
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
|
||||||
import org.apache.druid.data.input.InputRow;
|
import org.apache.druid.data.input.InputRow;
|
||||||
import org.apache.druid.java.util.common.IAE;
|
|
||||||
import org.apache.druid.segment.GenericColumnSerializer;
|
import org.apache.druid.segment.GenericColumnSerializer;
|
||||||
import org.apache.druid.segment.column.ColumnBuilder;
|
import org.apache.druid.segment.column.ColumnBuilder;
|
||||||
import org.apache.druid.segment.data.GenericIndexed;
|
import org.apache.druid.segment.data.GenericIndexed;
|
||||||
|
@ -75,18 +75,13 @@ public class DoublesSketchComplexMetricSerde extends ComplexMetricSerde
|
||||||
// Autodetection of the input format: empty string, number, or base64 encoded sketch
|
// Autodetection of the input format: empty string, number, or base64 encoded sketch
|
||||||
// A serialized DoublesSketch, as currently implemented, always has 0 in the first 6 bits.
|
// A serialized DoublesSketch, as currently implemented, always has 0 in the first 6 bits.
|
||||||
// This corresponds to "A" in base64, so it is not a digit
|
// This corresponds to "A" in base64, so it is not a digit
|
||||||
|
final Double doubleValue;
|
||||||
if (objectString.isEmpty()) {
|
if (objectString.isEmpty()) {
|
||||||
return DoublesSketchOperations.EMPTY_SKETCH;
|
return DoublesSketchOperations.EMPTY_SKETCH;
|
||||||
} else if (Character.isDigit(objectString.charAt(0))) {
|
} else if ((doubleValue = Doubles.tryParse(objectString)) != null) {
|
||||||
try {
|
UpdateDoublesSketch sketch = DoublesSketch.builder().setK(MIN_K).build();
|
||||||
double doubleValue = Double.parseDouble(objectString);
|
sketch.update(doubleValue);
|
||||||
UpdateDoublesSketch sketch = DoublesSketch.builder().setK(MIN_K).build();
|
return sketch;
|
||||||
sketch.update(doubleValue);
|
|
||||||
return sketch;
|
|
||||||
}
|
|
||||||
catch (NumberFormatException e) {
|
|
||||||
throw new IAE("Expected a string with a number, received value " + objectString);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if (object instanceof Number) { // this is for reindexing
|
} else if (object instanceof Number) { // this is for reindexing
|
||||||
UpdateDoublesSketch sketch = DoublesSketch.builder().setK(MIN_K).build();
|
UpdateDoublesSketch sketch = DoublesSketch.builder().setK(MIN_K).build();
|
||||||
|
|
|
@ -42,7 +42,33 @@ public class DoublesSketchComplexMetricSerdeTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExtractorOnNumber()
|
public void testExtractorOnPositiveNumber()
|
||||||
|
{
|
||||||
|
final DoublesSketchComplexMetricSerde serde = new DoublesSketchComplexMetricSerde();
|
||||||
|
final ComplexMetricExtractor extractor = serde.getExtractor();
|
||||||
|
final DoublesSketch sketch = (DoublesSketch) extractor.extractValue(
|
||||||
|
new MapBasedInputRow(0L, ImmutableList.of(), ImmutableMap.of("foo", "777")),
|
||||||
|
"foo"
|
||||||
|
);
|
||||||
|
Assert.assertEquals(1, sketch.getRetainedItems());
|
||||||
|
Assert.assertEquals(777d, sketch.getMaxValue(), 0.01d);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractorOnNegativeNumber()
|
||||||
|
{
|
||||||
|
final DoublesSketchComplexMetricSerde serde = new DoublesSketchComplexMetricSerde();
|
||||||
|
final ComplexMetricExtractor extractor = serde.getExtractor();
|
||||||
|
final DoublesSketch sketch = (DoublesSketch) extractor.extractValue(
|
||||||
|
new MapBasedInputRow(0L, ImmutableList.of(), ImmutableMap.of("foo", "-133")),
|
||||||
|
"foo"
|
||||||
|
);
|
||||||
|
Assert.assertEquals(1, sketch.getRetainedItems());
|
||||||
|
Assert.assertEquals(-133d, sketch.getMaxValue(), 0.01d);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractorOnDecimalNumber()
|
||||||
{
|
{
|
||||||
final DoublesSketchComplexMetricSerde serde = new DoublesSketchComplexMetricSerde();
|
final DoublesSketchComplexMetricSerde serde = new DoublesSketchComplexMetricSerde();
|
||||||
final ComplexMetricExtractor extractor = serde.getExtractor();
|
final ComplexMetricExtractor extractor = serde.getExtractor();
|
||||||
|
@ -53,4 +79,17 @@ public class DoublesSketchComplexMetricSerdeTest
|
||||||
Assert.assertEquals(1, sketch.getRetainedItems());
|
Assert.assertEquals(1, sketch.getRetainedItems());
|
||||||
Assert.assertEquals(3.1d, sketch.getMaxValue(), 0.01d);
|
Assert.assertEquals(3.1d, sketch.getMaxValue(), 0.01d);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractorOnLeadingDecimalNumber()
|
||||||
|
{
|
||||||
|
final DoublesSketchComplexMetricSerde serde = new DoublesSketchComplexMetricSerde();
|
||||||
|
final ComplexMetricExtractor extractor = serde.getExtractor();
|
||||||
|
final DoublesSketch sketch = (DoublesSketch) extractor.extractValue(
|
||||||
|
new MapBasedInputRow(0L, ImmutableList.of(), ImmutableMap.of("foo", ".1")),
|
||||||
|
"foo"
|
||||||
|
);
|
||||||
|
Assert.assertEquals(1, sketch.getRetainedItems());
|
||||||
|
Assert.assertEquals(0.1d, sketch.getMaxValue(), 0.01d);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -279,8 +279,8 @@ credentials/configs may need to be set in the same file as your Druid's Hadoop c
|
||||||
If you are running ITHadoopIndexTest with your own Druid + Hadoop cluster, please follow the below steps:
|
If you are running ITHadoopIndexTest with your own Druid + Hadoop cluster, please follow the below steps:
|
||||||
- Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
|
- Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
|
||||||
located in integration-tests/src/test/resources/data/batch_index/json to your HDFS at /batch_index/json/
|
located in integration-tests/src/test/resources/data/batch_index/json to your HDFS at /batch_index/json/
|
||||||
- Copy batch_hadoop.data located in integration-tests/src/test/resources/data/batch_index/tsv to your HDFS
|
- Copy batch_hadoop.data located in integration-tests/src/test/resources/data/batch_index/hadoop_tsv to your HDFS
|
||||||
at /batch_index/tsv/
|
at /batch_index/hadoop_tsv/
|
||||||
If using the Docker-based Hadoop container, the steps above are automatically done by the integration tests.
|
If using the Docker-based Hadoop container, the steps above are automatically done by the integration tests.
|
||||||
|
|
||||||
When running the Hadoop tests, you must set `-Dextra.datasource.name.suffix=''`, due to https://github.com/apache/druid/issues/9788.
|
When running the Hadoop tests, you must set `-Dextra.datasource.name.suffix=''`, due to https://github.com/apache/druid/issues/9788.
|
||||||
|
|
|
@ -49,11 +49,13 @@ mv $SHARED_DIR/docker/lib/druid-hdfs-storage-* $SHARED_DIR/docker/extensions/dru
|
||||||
mkdir -p $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service
|
mkdir -p $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service
|
||||||
mv $SHARED_DIR/docker/lib/druid-kinesis-indexing-service-* $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service
|
mv $SHARED_DIR/docker/lib/druid-kinesis-indexing-service-* $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service
|
||||||
# For druid-parquet-extensions
|
# For druid-parquet-extensions
|
||||||
|
# Using cp so that this extensions is included when running Druid without loadList and as a option for the loadList
|
||||||
mkdir -p $SHARED_DIR/docker/extensions/druid-parquet-extensions
|
mkdir -p $SHARED_DIR/docker/extensions/druid-parquet-extensions
|
||||||
mv $SHARED_DIR/docker/lib/druid-parquet-extensions-* $SHARED_DIR/docker/extensions/druid-parquet-extensions
|
cp $SHARED_DIR/docker/lib/druid-parquet-extensions-* $SHARED_DIR/docker/extensions/druid-parquet-extensions
|
||||||
# For druid-orc-extensions
|
# For druid-orc-extensions
|
||||||
|
# Using cp so that this extensions is included when running Druid without loadList and as a option for the loadList
|
||||||
mkdir -p $SHARED_DIR/docker/extensions/druid-orc-extensions
|
mkdir -p $SHARED_DIR/docker/extensions/druid-orc-extensions
|
||||||
mv $SHARED_DIR/docker/lib/druid-orc-extensions-* $SHARED_DIR/docker/extensions/druid-orc-extensions
|
cp $SHARED_DIR/docker/lib/druid-orc-extensions-* $SHARED_DIR/docker/extensions/druid-orc-extensions
|
||||||
|
|
||||||
# Pull Hadoop dependency if needed
|
# Pull Hadoop dependency if needed
|
||||||
if [ -n "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" == true ]
|
if [ -n "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" == true ]
|
||||||
|
|
|
@ -27,6 +27,8 @@ public class TestNGGroup
|
||||||
{
|
{
|
||||||
public static final String BATCH_INDEX = "batch-index";
|
public static final String BATCH_INDEX = "batch-index";
|
||||||
|
|
||||||
|
public static final String INPUT_FORMAT = "input-format";
|
||||||
|
|
||||||
public static final String KAFKA_INDEX = "kafka-index";
|
public static final String KAFKA_INDEX = "kafka-index";
|
||||||
|
|
||||||
public static final String KAFKA_INDEX_SLOW = "kafka-index-slow";
|
public static final String KAFKA_INDEX_SLOW = "kafka-index-slow";
|
||||||
|
|
|
@ -42,8 +42,8 @@ import java.util.function.Function;
|
||||||
* 1) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
|
* 1) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
|
||||||
* located in integration-tests/src/test/resources/data/batch_index/json to your HDFS at /batch_index/json/
|
* located in integration-tests/src/test/resources/data/batch_index/json to your HDFS at /batch_index/json/
|
||||||
* If using the Docker-based Hadoop container, this is automatically done by the integration tests.
|
* If using the Docker-based Hadoop container, this is automatically done by the integration tests.
|
||||||
* 2) Copy batch_hadoop.data located in integration-tests/src/test/resources/data/batch_index/tsv to your HDFS
|
* 2) Copy batch_hadoop.data located in integration-tests/src/test/resources/data/batch_index/hadoop_tsv to your HDFS
|
||||||
* at /batch_index/tsv/
|
* at /batch_index/hadoop_tsv/
|
||||||
* If using the Docker-based Hadoop container, this is automatically done by the integration tests.
|
* If using the Docker-based Hadoop container, this is automatically done by the integration tests.
|
||||||
* 2) Provide -Doverride.config.path=<PATH_TO_FILE> with HDFS configs set. See
|
* 2) Provide -Doverride.config.path=<PATH_TO_FILE> with HDFS configs set. See
|
||||||
* integration-tests/docker/environment-configs/override-examples/hdfs for env vars to provide.
|
* integration-tests/docker/environment-configs/override-examples/hdfs for env vars to provide.
|
||||||
|
|
|
@ -50,6 +50,9 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
||||||
{
|
{
|
||||||
public enum InputFormatDetails
|
public enum InputFormatDetails
|
||||||
{
|
{
|
||||||
|
AVRO("avro_ocf", ".avro", "/avro"),
|
||||||
|
CSV("csv", ".csv", "/csv"),
|
||||||
|
TSV("tsv", ".tsv", "/tsv"),
|
||||||
ORC("orc", ".orc", "/orc"),
|
ORC("orc", ".orc", "/orc"),
|
||||||
JSON("json", ".json", "/json"),
|
JSON("json", ".json", "/json"),
|
||||||
PARQUET("parquet", ".parquet", "/parquet");
|
PARQUET("parquet", ".parquet", "/parquet");
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.tests.indexer;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
public abstract class AbstractLocalInputSourceParallelIndexTest extends AbstractITBatchIndexTest
|
||||||
|
{
|
||||||
|
private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json";
|
||||||
|
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
|
||||||
|
|
||||||
|
public void doIndexTest(InputFormatDetails inputFormatDetails) throws Exception
|
||||||
|
{
|
||||||
|
doIndexTest(inputFormatDetails, ImmutableMap.of());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doIndexTest(InputFormatDetails inputFormatDetails, @Nonnull Map<String, Object> extraInputFormatMap) throws Exception
|
||||||
|
{
|
||||||
|
final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
|
||||||
|
Map inputFormatMap = new ImmutableMap.Builder<String, Object>().putAll(extraInputFormatMap)
|
||||||
|
.put("type", inputFormatDetails.getInputFormatType())
|
||||||
|
.build();
|
||||||
|
try (
|
||||||
|
final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix());
|
||||||
|
) {
|
||||||
|
final Function<String, String> sqlInputSourcePropsTransform = spec -> {
|
||||||
|
try {
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%PARTITIONS_SPEC%%",
|
||||||
|
jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null))
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%INPUT_SOURCE_FILTER%%",
|
||||||
|
"*" + inputFormatDetails.getFileExtension()
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%INPUT_SOURCE_BASE_DIR%%",
|
||||||
|
"/resources/data/batch_index" + inputFormatDetails.getFolderSuffix()
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%INPUT_FORMAT%%",
|
||||||
|
jsonMapper.writeValueAsString(inputFormatMap)
|
||||||
|
);
|
||||||
|
return spec;
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
doIndexTest(
|
||||||
|
indexDatasource,
|
||||||
|
INDEX_TASK,
|
||||||
|
sqlInputSourcePropsTransform,
|
||||||
|
INDEX_QUERIES_RESOURCE,
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.tests.indexer;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||||
|
import org.apache.druid.tests.TestNGGroup;
|
||||||
|
import org.testng.annotations.Guice;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Test(groups = TestNGGroup.INPUT_FORMAT)
|
||||||
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
|
public class ITLocalInputSourceAllInputFormatTest extends AbstractLocalInputSourceParallelIndexTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testAvroInputFormatIndexDataIngestionSpecWithSchema() throws Exception
|
||||||
|
{
|
||||||
|
List fieldList = ImmutableList.of(
|
||||||
|
ImmutableMap.of("name", "timestamp", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "page", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "language", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "user", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "unpatrolled", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "newPage", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "robot", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "anonymous", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "namespace", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "continent", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "country", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "region", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "city", "type", "string"),
|
||||||
|
ImmutableMap.of("name", "added", "type", "int"),
|
||||||
|
ImmutableMap.of("name", "deleted", "type", "int"),
|
||||||
|
ImmutableMap.of("name", "delta", "type", "int")
|
||||||
|
);
|
||||||
|
Map schema = ImmutableMap.of("namespace", "org.apache.druid.data.input",
|
||||||
|
"type", "record",
|
||||||
|
"name", "wikipedia",
|
||||||
|
"fields", fieldList);
|
||||||
|
doIndexTest(InputFormatDetails.AVRO, ImmutableMap.of("schema", schema));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAvroInputFormatIndexDataIngestionSpecWithoutSchema() throws Exception
|
||||||
|
{
|
||||||
|
doIndexTest(InputFormatDetails.AVRO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJsonInputFormatIndexDataIngestionSpecWithSchema() throws Exception
|
||||||
|
{
|
||||||
|
doIndexTest(InputFormatDetails.JSON);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTsvInputFormatIndexDataIngestionSpecWithSchema() throws Exception
|
||||||
|
{
|
||||||
|
doIndexTest(InputFormatDetails.TSV, ImmutableMap.of("findColumnsFromHeader", true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParquetInputFormatIndexDataIngestionSpecWithSchema() throws Exception
|
||||||
|
{
|
||||||
|
doIndexTest(InputFormatDetails.PARQUET);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOrcInputFormatIndexDataIngestionSpecWithSchema() throws Exception
|
||||||
|
{
|
||||||
|
doIndexTest(InputFormatDetails.ORC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCsvInputFormatIndexDataIngestionSpecWithSchema() throws Exception
|
||||||
|
{
|
||||||
|
doIndexTest(InputFormatDetails.CSV, ImmutableMap.of("findColumnsFromHeader", true));
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,4 @@
|
||||||
|
timestamp,page,language,user,unpatrolled,newPage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta
|
||||||
|
2013-08-31T01:02:33Z,Gypsy Danger,en,nuclear,TRUE,TRUE,FALSE,FALSE,article,North America,United States,Bay Area,San Francisco,57,200,-143
|
||||||
|
2013-08-31T03:32:45Z,Striker Eureka,en,speed,FALSE,TRUE,TRUE,FALSE,wikipedia,Australia,Australia,Cantebury,Syndey,459,129,330
|
||||||
|
2013-08-31T07:11:21Z,Cherno Alpha,ru,masterYi,FALSE,TRUE,TRUE,FALSE,article,Asia,Russia,Oblast,Moscow,123,12,111
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
timestamp,page,language,user,unpatrolled,newPage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta
|
||||||
|
2013-08-31T11:58:39Z,Crimson Typhoon,zh,triplets,TRUE,FALSE,TRUE,FALSE,wikipedia,Asia,China,Shanxi,Taiyuan,905,5,900
|
||||||
|
2013-08-31T12:41:27Z,Coyote Tango,ja,stringer,TRUE,FALSE,TRUE,FALSE,wikipedia,Asia,Japan,Kanto,Tokyo,1,10,-9
|
||||||
|
2013-09-01T01:02:33Z,Gypsy Danger,en,nuclear,TRUE,TRUE,FALSE,FALSE,article,North America,United States,Bay Area,San Francisco,57,200,-143
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
timestamp,page,language,user,unpatrolled,newPage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta
|
||||||
|
2013-09-01T03:32:45Z,Striker Eureka,en,speed,FALSE,TRUE,TRUE,FALSE,wikipedia,Australia,Australia,Cantebury,Syndey,459,129,330
|
||||||
|
2013-09-01T07:11:21Z,Cherno Alpha,ru,masterYi,FALSE,TRUE,TRUE,FALSE,article,Asia,Russia,Oblast,Moscow,123,12,111
|
||||||
|
2013-09-01T11:58:39Z,Crimson Typhoon,zh,triplets,TRUE,FALSE,TRUE,FALSE,wikipedia,Asia,China,Shanxi,Taiyuan,905,5,900
|
||||||
|
2013-09-01T12:41:27Z,Coyote Tango,ja,stringer,TRUE,FALSE,TRUE,FALSE,wikipedia,Asia,Japan,Kanto,Tokyo,1,10,-9
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
timestamp page language user unpatrolled newPage robot anonymous namespace continent country region city added deleted delta
|
||||||
|
2013-08-31T01:02:33Z Gypsy Danger en nuclear true true false false article North America United States Bay Area San Francisco 57 200 -143
|
||||||
|
2013-08-31T03:32:45Z Striker Eureka en speed false true true false wikipedia Australia Australia Cantebury Syndey 459 129 330
|
||||||
|
2013-08-31T07:11:21Z Cherno Alpha ru masterYi false true true false article Asia Russia Oblast Moscow 123 12 111
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
timestamp page language user unpatrolled newPage robot anonymous namespace continent country region city added deleted delta
|
||||||
|
2013-08-31T11:58:39Z Crimson Typhoon zh triplets true false true false wikipedia Asia China Shanxi Taiyuan 905 5 900
|
||||||
|
2013-08-31T12:41:27Z Coyote Tango ja stringer true false true false wikipedia Asia Japan Kanto Tokyo 1 10 -9
|
||||||
|
2013-09-01T01:02:33Z Gypsy Danger en nuclear true true false false article North America United States Bay Area San Francisco 57 200 -143
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
timestamp page language user unpatrolled newPage robot anonymous namespace continent country region city added deleted delta
|
||||||
|
2013-09-01T03:32:45Z Striker Eureka en speed false true true false wikipedia Australia Australia Cantebury Syndey 459 129 330
|
||||||
|
2013-09-01T07:11:21Z Cherno Alpha ru masterYi false true true false article Asia Russia Oblast Moscow 123 12 111
|
||||||
|
2013-09-01T11:58:39Z Crimson Typhoon zh triplets true false true false wikipedia Asia China Shanxi Taiyuan 905 5 900
|
||||||
|
2013-09-01T12:41:27Z Coyote Tango ja stringer true false true false wikipedia Asia Japan Kanto Tokyo 1 10 -9
|
|
|
@ -53,7 +53,7 @@
|
||||||
"type": "hadoop",
|
"type": "hadoop",
|
||||||
"inputSpec": {
|
"inputSpec": {
|
||||||
"type": "static",
|
"type": "static",
|
||||||
"paths": "/batch_index/tsv/batch_hadoop.data"
|
"paths": "/batch_index/hadoop_tsv/batch_hadoop.data"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"tuningConfig": {
|
"tuningConfig": {
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
{
|
||||||
|
"type": "index_parallel",
|
||||||
|
"spec": {
|
||||||
|
"dataSchema": {
|
||||||
|
"dataSource": "%%DATASOURCE%%",
|
||||||
|
"timestampSpec": {
|
||||||
|
"column": "timestamp"
|
||||||
|
},
|
||||||
|
"dimensionsSpec": {
|
||||||
|
"dimensions": [
|
||||||
|
"page",
|
||||||
|
{"type": "string", "name": "language", "createBitmapIndex": false},
|
||||||
|
"user",
|
||||||
|
"unpatrolled",
|
||||||
|
"newPage",
|
||||||
|
"robot",
|
||||||
|
"anonymous",
|
||||||
|
"namespace",
|
||||||
|
"continent",
|
||||||
|
"country",
|
||||||
|
"region",
|
||||||
|
"city"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"metricsSpec": [
|
||||||
|
{
|
||||||
|
"type": "count",
|
||||||
|
"name": "count"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "added",
|
||||||
|
"fieldName": "added"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "deleted",
|
||||||
|
"fieldName": "deleted"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "delta",
|
||||||
|
"fieldName": "delta"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "thetaSketch",
|
||||||
|
"type": "thetaSketch",
|
||||||
|
"fieldName": "user"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "quantilesDoublesSketch",
|
||||||
|
"type": "quantilesDoublesSketch",
|
||||||
|
"fieldName": "delta"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "HLLSketchBuild",
|
||||||
|
"type": "HLLSketchBuild",
|
||||||
|
"fieldName": "user"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"granularitySpec": {
|
||||||
|
"segmentGranularity": "DAY",
|
||||||
|
"queryGranularity": "second",
|
||||||
|
"intervals" : [ "2013-08-31/2013-09-02" ]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "index_parallel",
|
||||||
|
"inputSource": {
|
||||||
|
"type": "local",
|
||||||
|
"filter" : "%%INPUT_SOURCE_FILTER%%",
|
||||||
|
"baseDir": "%%INPUT_SOURCE_BASE_DIR%%"
|
||||||
|
},
|
||||||
|
"inputFormat": %%INPUT_FORMAT%%
|
||||||
|
},
|
||||||
|
"tuningConfig": {
|
||||||
|
"type": "index_parallel",
|
||||||
|
"maxNumConcurrentSubTasks": 10,
|
||||||
|
"splitHintSpec": {
|
||||||
|
"type": "maxSize",
|
||||||
|
"maxSplitSize": 1
|
||||||
|
},
|
||||||
|
"partitionsSpec": %%PARTITIONS_SPEC%%
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue