DruidInputSource can add new dimensions during re-ingestion (#9590)

* WIP integration tests

* Add integration test for ingestion with transformSpec

* WIP almost working tests

* Add ignored tests

* checkstyle stuff

* remove newPage from index task ingestion spec

* more test cleanup

* still not quite working

* Actually disable the tests

* working tests

* fix codestyle

* dont use junit in integration tests

* actually fix the bug

* fix checkstyle

* bring index tests closer to reindex tests
This commit is contained in:
Suneet Saldanha 2020-04-02 17:32:31 -07:00 committed by GitHub
parent dbaabdd247
commit af3337dac8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 720 additions and 24 deletions

View File

@ -55,7 +55,7 @@
"baseDir" : "quickstart/tutorial",
"filter" : "transform-data.json"
},
"inpuFormat" : {
"inputFormat" : {
"type" : "json"
},
"appendToExisting" : false

View File

@ -22,11 +22,13 @@ package org.apache.druid.indexing.common;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -101,4 +103,34 @@ public class ReingestionTimelineUtils
.mapToObj(orderedMetrics::get)
.collect(Collectors.toList());
}
/**
* Utility function to get dimensions that should be ingested. The preferred order is
* - Explicit dimensions if they are provided.
* - Custom dimensions are provided in the inputSpec.
* - Calculate dimensions from the timeline but exclude any dimension exclusions.
*
* @param explicitDimensions sent as part of the re-ingestion InputSource.
* @param dimensionsSpec from the provided ingestion spec.
* @param timeLineSegments for the datasource that is being read.
* @return
*/
public static List<String> getDimensionsToReingest(
@Nullable List<String> explicitDimensions,
@NotNull DimensionsSpec dimensionsSpec,
@NotNull List<TimelineObjectHolder<String, DataSegment>> timeLineSegments)
{
final List<String> dims;
if (explicitDimensions != null) {
dims = explicitDimensions;
} else if (dimensionsSpec.hasCustomDimensions()) {
dims = dimensionsSpec.getDimensionNames();
} else {
dims = ReingestionTimelineUtils.getUniqueDimensions(
timeLineSegments,
dimensionsSpec.getDimensionExclusions()
);
}
return dims;
}
}

View File

@ -214,18 +214,11 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
}
}
final List<String> dims;
if (dimensions != null) {
dims = dimensions;
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
} else {
dims = ReingestionTimelineUtils.getUniqueDimensions(
timeLineSegments,
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
);
}
final List<String> dims = ReingestionTimelineUtils.getDimensionsToReingest(
dimensions,
inputRowParser.getParseSpec().getDimensionsSpec(),
timeLineSegments
);
final List<String> metricsList = metrics == null
? ReingestionTimelineUtils.getUniqueMetrics(timeLineSegments)
: metrics;

View File

@ -181,17 +181,11 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
.from(partitionHolder)
.transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
}).iterator();
final List<String> effectiveDimensions;
if (dimensions == null) {
effectiveDimensions = ReingestionTimelineUtils.getUniqueDimensions(
timeline,
inputRowSchema.getDimensionsSpec().getDimensionExclusions()
);
} else if (inputRowSchema.getDimensionsSpec().hasCustomDimensions()) {
effectiveDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
} else {
effectiveDimensions = dimensions;
}
final List<String> effectiveDimensions = ReingestionTimelineUtils.getDimensionsToReingest(
dimensions,
inputRowSchema.getDimensionsSpec(),
timeline
);
List<String> effectiveMetrics;
if (metrics == null) {

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.io.IOException;
@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITTransformTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK_WITH_FIREHOSE = "/indexer/wikipedia_index_task_with_transform.json";
private static final String INDEX_TASK_WITH_INPUT_SOURCE = "/indexer/wikipedia_index_task_with_inputsource_transform.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries_with_transform.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String REINDEX_TASK = "/indexer/wikipedia_reindex_task_with_transforms.json";
private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_reindex_druid_input_source_task_with_transforms.json";
private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries_with_transforms.json";
private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
@Test
public void testIndexAndReIndexWithTransformSpec() throws IOException
{
final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK_WITH_INPUT_SOURCE,
INDEX_QUERIES_RESOURCE,
false,
true,
true
);
doReindexTest(
INDEX_DATASOURCE,
reindexDatasourceWithDruidInputSource,
REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
REINDEX_QUERIES_RESOURCE
);
}
}
@Test(enabled = false)
public void testIndexAndReIndexUsingIngestSegmentWithTransforms() throws IOException
{
// TODO: re-instate this test when https://github.com/apache/druid/issues/9591 is fixed
// Move the re-index step into testIndexAndReIndexWithTransformSpec for faster tests!
final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK_WITH_INPUT_SOURCE,
INDEX_QUERIES_RESOURCE,
false,
true,
true
);
doReindexTest(
INDEX_DATASOURCE,
reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE
);
}
}
@Test(enabled = false)
public void testIndexWithFirehoseAndTransforms() throws IOException
{
// TODO: re-instate this test when https://github.com/apache/druid/issues/9589 is fixed
final String indexDatasource = INDEX_DATASOURCE + "-firehose";
try (
final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix());
) {
doIndexTest(
indexDatasource,
INDEX_TASK_WITH_FIREHOSE,
INDEX_QUERIES_RESOURCE,
false,
true,
true
);
}
}
}

View File

@ -0,0 +1,62 @@
[
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"page",
"city"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"language-zh"
},
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"triple-added",
"name":"added_count"
},
{
"type":"longSum",
"fieldName":"delta",
"name":"delta_sum"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 27150.0,
"page" : "Crimson Typhoon",
"city" : "Taiyuan",
"added_count" : 2715,
"delta_sum" : 900,
"rows" : 1
}
} ]
}
]

View File

@ -0,0 +1,103 @@
{
"type" : "index",
"spec" : {
"dataSchema" : {
"dataSource" : "%%DATASOURCE%%",
"timestampSpec": {
"column": "timestamp"
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "triple-added",
"fieldName": "triple-added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"name": "thetaSketch",
"type": "thetaSketch",
"fieldName": "user"
},
{
"name": "quantilesDoublesSketch",
"type": "quantilesDoublesSketch",
"fieldName": "delta"
},
{
"name": "HLLSketchBuild",
"type": "HLLSketchBuild",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
},
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "language",
"expression": "concat('language-', language)"
},
{
"type": "expression",
"name": "triple-added",
"expression": "added * 3"
}
]
}
},
"ioConfig" : {
"type" : "index",
"inputSource" : {
"type" : "local",
"baseDir" : "/resources/data/batch_index",
"filter" : "wikipedia_index_data*"
},
"inputFormat" : {
"type" : "json"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index",
"maxRowsPerSegment" : 5000000,
"maxRowsInMemory" : 25000
}
}
}

View File

@ -0,0 +1,103 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "triple-added",
"fieldName": "triple-added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"name": "thetaSketch",
"type": "thetaSketch",
"fieldName": "user"
},
{
"name": "quantilesDoublesSketch",
"type": "quantilesDoublesSketch",
"fieldName": "delta"
},
{
"name": "HLLSketchBuild",
"type": "HLLSketchBuild",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
}
}
},
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "language",
"expression": "concat('language-', language)"
},
{
"type": "expression",
"name": "triple-added",
"expression": "added * 3"
}
]
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "local",
"baseDir": "/resources/data/batch_index",
"filter": "wikipedia_index_data*"
}
},
"tuningConfig": {
"type": "index",
"maxRowsPerSegment": 3
}
}
}

View File

@ -0,0 +1,106 @@
{
"type": "index",
"spec": {
"ioConfig": {
"type": "index",
"inputSource": {
"type": "druid",
"dataSource": "%%DATASOURCE%%",
"interval": "2013-08-31/2013-09-01"
}
},
"tuningConfig": {
"type": "index",
"partitionsSpec": {
"type": "dynamic"
}
},
"dataSchema": {
"dataSource": "%%REINDEX_DATASOURCE%%",
"granularitySpec": {
"type": "uniform",
"queryGranularity": "SECOND",
"segmentGranularity": "DAY"
},
"timestampSpec": {
"column": "__time",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"page",
"newPage",
"anonymous",
"namespace",
"country",
"region",
"city"
]
},
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "newPage",
"expression": "page"
},
{
"type": "expression",
"name": "city",
"expression": "concat('city-', city)"
},
{
"type": "expression",
"name": "one-plus-triple-added",
"expression": "\"triple-added\" + 1"
},
{
"type": "expression",
"name": "delta",
"expression": "\"delta\" / 2"
},
{
"type": "expression",
"name": "double-deleted",
"expression": "deleted * 2"
}
]
},
"metricsSpec": [
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "triple-added",
"fieldName": "triple-added"
},
{
"type": "doubleSum",
"name": "one-plus-triple-added",
"fieldName": "one-plus-triple-added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "double-deleted",
"fieldName": "double-deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
]
}
}
}

View File

@ -0,0 +1,80 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
}
}
]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"newPage",
"city"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"language-zh"
},
"aggregations":[
{
"type":"longSum",
"fieldName":"one-plus-triple-added",
"name":"added_count"
},
{
"type":"longSum",
"fieldName":"double-deleted",
"name":"double_deleted_count"
},
{
"type":"longSum",
"fieldName":"delta",
"name":"delta_overshadowed"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 27160.0,
"newPage" : "Crimson Typhoon",
"city" : "city-Taiyuan",
"double_deleted_count" : 10,
"delta_overshadowed" : 450,
"added_count" : 2716
}
} ]
}
]

View File

@ -0,0 +1,108 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%REINDEX_DATASOURCE%%",
"metricsSpec": [
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "triple-added",
"fieldName": "triple-added"
},
{
"type": "doubleSum",
"name": "one-plus-triple-added",
"fieldName": "one-plus-triple-added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "double-deleted",
"fieldName": "double-deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"page",
"newPage",
"anonymous",
"namespace",
"country",
"region",
"city"
]
},
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "newPage",
"expression": "page"
},
{
"type": "expression",
"name": "city",
"expression": "concat('city-', city)"
},
{
"type": "expression",
"name": "one-plus-triple-added",
"expression": "\"triple-added\" + 1"
},
{
"type": "expression",
"name": "delta",
"expression": "\"delta\" / 2"
},
{
"type": "expression",
"name": "double-deleted",
"expression": "deleted * 2"
}
]
}
}
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "ingestSegment",
"dataSource": "%%DATASOURCE%%",
"interval": "2013-08-31/2013-09-01"
}
},
"tuningConfig": {
"type": "index"
}
}
}