Fix CombiningFirehose compatibility (#10264)

* Fix CombiningFirehose

* Add integration test

* Fix path

* Add full datasource name

* Fix input location

Co-authored-by: Atul Mohan <atulmohan@yahoo-inc.com>
This commit is contained in:
Atul Mohan 2020-08-20 12:37:38 -05:00 committed by GitHub
parent b36dab0fe6
commit 618c04a99e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 384 additions and 10 deletions

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.function.Function;
@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCombiningFirehoseFactoryIndexTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String COMBINING_INDEX_TASK = "/indexer/wikipedia_combining_firehose_index_task.json";
private static final String COMBINING_QUERIES_RESOURCE = "/indexer/wikipedia_combining_firehose_index_queries.json";
private static final String COMBINING_INDEX_DATASOURCE = "wikipedia_comb_index_test";
@Test
public void testIndexData() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(COMBINING_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
final Function<String, String> combiningFirehoseSpecTransform = spec -> {
try {
return StringUtils.replace(
spec,
"%%COMBINING_DATASOURCE%%",
INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK,
INDEX_QUERIES_RESOURCE,
false,
true,
true
);
doIndexTest(
COMBINING_INDEX_DATASOURCE,
COMBINING_INDEX_TASK,
combiningFirehoseSpecTransform,
COMBINING_QUERIES_RESOURCE,
false,
true,
true
);
}
}
}

View File

@ -0,0 +1,141 @@
[
{
"description": "timeseries, 1 agg, all",
"query": {
"queryType": "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults": [
{
"timestamp": "2013-08-31T01:02:33.000Z",
"result": {
"minTime": "2013-08-31T01:02:33.000Z",
"maxTime": "2013-09-01T18:22:39.000Z"
}
}
]
},
{
"description": "timeseries, datasketch aggs, all",
"query": {
"queryType": "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity": "day",
"intervals": [
"2013-09-01T00:00/2013-09-02T00:00"
],
"filter": null,
"aggregations": [
{
"type": "HLLSketchMerge",
"name": "approxCountHLL",
"fieldName": "HLLSketchBuild",
"lgK": 12,
"tgtHllType": "HLL_4",
"round": true
},
{
"type": "thetaSketch",
"name": "approxCountTheta",
"fieldName": "thetaSketch",
"size": 16384,
"shouldFinalize": true,
"isInputThetaSketch": false,
"errorBoundsStdDev": null
},
{
"type": "quantilesDoublesSketch",
"name": "quantilesSketch",
"fieldName": "quantilesDoublesSketch",
"k": 128
}
]
},
"expectedResults": [
{
"timestamp": "2013-09-01T00:00:00.000Z",
"result": {
"quantilesSketch": 6,
"approxCountTheta": 6.0,
"approxCountHLL": 6
}
}
]
},
{
"description": "having spec on post aggregation",
"query": {
"queryType": "groupBy",
"dataSource": "%%DATASOURCE%%",
"granularity": "day",
"dimensions": [
"page"
],
"filter": {
"type": "selector",
"dimension": "language",
"value": "zh"
},
"aggregations": [
{
"type": "count",
"name": "rows"
},
{
"type": "longSum",
"fieldName": "added",
"name": "added_count"
}
],
"postAggregations": [
{
"type": "arithmetic",
"name": "added_count_times_ten",
"fn": "*",
"fields": [
{
"type": "fieldAccess",
"name": "added_count",
"fieldName": "added_count"
},
{
"type": "constant",
"name": "const",
"value": 10
}
]
}
],
"having": {
"type": "greaterThan",
"aggregation": "added_count_times_ten",
"value": 9000
},
"intervals": [
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults": [
{
"version": "v1",
"timestamp": "2013-08-31T00:00:00.000Z",
"event": {
"added_count_times_ten": 9050.0,
"page": "Crimson Typhoon",
"added_count": 905,
"rows": 1
}
},
{
"version": "v1",
"timestamp": "2013-08-31T00:00:00.000Z",
"event": {
"added_count_times_ten": 9770.0,
"page": "Gypsy Danger",
"added_count": 977,
"rows": 1
}
}
]
}
]

View File

@ -0,0 +1,95 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"name": "thetaSketch",
"type": "thetaSketch",
"fieldName": "user"
},
{
"name": "quantilesDoublesSketch",
"type": "quantilesDoublesSketch",
"fieldName": "delta"
},
{
"name": "HLLSketchBuild",
"type": "HLLSketchBuild",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
}
}
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "combining",
"delegates": [
{
"type": "local",
"baseDir": "/resources/indexer",
"filter": "wikipedia_combining_index_data.json"
},
{
"type": "ingestSegment",
"dataSource": "%%COMBINING_DATASOURCE%%",
"interval": "2013-08-31/2013-09-02"
}
]
}
},
"tuningConfig": {
"type": "index",
"maxRowsPerSegment": 3
}
}
}

View File

@ -0,0 +1,3 @@
{"timestamp": "2013-08-31T05:18:22Z", "page": "Gypsy Danger", "language" : "zh", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 977, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T17:57:01Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-09-01T18:22:39Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYo", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}

View File

@ -22,9 +22,12 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -33,11 +36,12 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
/**
* Creates firehose that combines data from different Firehoses. Useful for ingesting data from multiple sources.
*/
public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
public class CombiningFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<FirehoseFactory>>
{
private static final EmittingLogger log = new EmittingLogger(CombiningFirehoseFactory.class);
@ -64,6 +68,32 @@ public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
return delegateFactoryList;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public Stream<InputSplit<List<FirehoseFactory>>> getSplits(
@Nullable SplitHintSpec splitHintSpec
)
{
return Stream.of(new InputSplit<>(delegateFactoryList));
}
@Override
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
{
return 1;
}
@Override
public FiniteFirehoseFactory<InputRowParser, List<FirehoseFactory>> withSplit(InputSplit<List<FirehoseFactory>> split)
{
return new CombiningFirehoseFactory(split.get());
}
class CombiningFirehose implements Firehose
{
private final InputRowParser parser;

View File

@ -19,15 +19,18 @@
package org.apache.druid.segment.realtime.firehose;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
@ -38,22 +41,27 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
public class CombiningFirehoseFactoryTest
{
private CombiningFirehoseFactory combiningFirehoseFactory;
private List<FirehoseFactory> delegateFirehoses;
@Before
public void setUp()
{
delegateFirehoses = Arrays.asList(
new ListFirehoseFactory(Arrays.asList(makeRow(1, 1), makeRow(2, 2))),
new ListFirehoseFactory(Arrays.asList(makeRow(3, 3), makeRow(4, 4), makeRow(5, 5)))
);
combiningFirehoseFactory = new CombiningFirehoseFactory(delegateFirehoses);
}
@Test
public void testCombiningfirehose() throws IOException
{
List<InputRow> list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2));
List<InputRow> list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4), makeRow(5, 5));
FirehoseFactory combiningFactory = new CombiningFirehoseFactory(
Arrays.asList(
new ListFirehoseFactory(list1),
new ListFirehoseFactory(list2)
)
);
final Firehose firehose = combiningFactory.connect(null, null);
final Firehose firehose = combiningFirehoseFactory.connect(null, null);
for (int i = 1; i < 6; i++) {
Assert.assertTrue(firehose.hasMore());
final InputRow inputRow = firehose.nextRow();
@ -63,6 +71,21 @@ public class CombiningFirehoseFactoryTest
Assert.assertFalse(firehose.hasMore());
}
@Test
public void testFirehoseNotParallelizable()
{
Optional<InputSplit<List<FirehoseFactory>>> maybeFirehoseWithSplit = combiningFirehoseFactory.getSplits(null)
.findFirst();
Assert.assertTrue(maybeFirehoseWithSplit.isPresent());
FiniteFirehoseFactory<InputRowParser, List<FirehoseFactory>> firehoseWithSplit = combiningFirehoseFactory.withSplit(
maybeFirehoseWithSplit.get());
Assert.assertTrue(firehoseWithSplit instanceof CombiningFirehoseFactory);
Assert.assertFalse(combiningFirehoseFactory.isSplittable());
Assert.assertEquals(delegateFirehoses, ((CombiningFirehoseFactory) firehoseWithSplit).getDelegateFactoryList());
}
private InputRow makeRow(final long timestamp, final float metricValue)
{
return new InputRow()