Fix stringFirst/stringLast rollup during ingestion (#10332)

* Add IndexMergerRollupTest

This changelist adds a test to merge indexes with StringFirst/StringLast aggregator.

* Fix StringFirstAggregateCombiner/StringLastAggregateCombiner

The segment-level type for stringFirst/stringLast is SerializablePairLongString,
not String. This changelist fixes it.

* Fix EarliestLatestAnySqlAggregator to handle COMPLEX type

This changelist allows EarliestLatestAnySqlAggregator to accept COMPLEX
type as an operand. For its return type, we set it to VARCHAR, since
COMPLEX column is only generated by stringFirst/stringLast during ingestion
rollup.

* Return value with smaller timestamp in StringFirstAggregatorFactory.combine function

* Add integration tests for stringFirst/stringLast during ingestion

* Use one EarliestLatestReturnTypeInference instance

Co-authored-by: Joy Kent <joy@automonic.ai>
This commit is contained in:
Joy Kent 2020-09-08 17:36:04 -07:00 committed by GitHub
parent d32d1e7004
commit e5f0da30ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 506 additions and 33 deletions

View File

@ -45,6 +45,15 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
private static final String MERGE_INDEX_TASK = "/indexer/wikipedia_merge_index_task.json";
private static final String MERGE_INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
private static final String MERGE_INDEX_DATASOURCE = "wikipedia_merge_index_test";
private static final String MERGE_REINDEX_TASK = "/indexer/wikipedia_merge_reindex_task.json";
private static final String MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_merge_reindex_druid_input_source_task.json";
private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test";
@Test
public void testIndexData() throws Exception
{
@ -110,4 +119,37 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
);
}
}
@Test
public void testMERGEIndexData() throws Exception
{
final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";
final String reindexDatasourceWithDruidInputSource = MERGE_REINDEX_DATASOURCE + "-testMergeReIndexData-druidInputSource";
try (
final Closeable ignored1 = unloader(MERGE_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
MERGE_INDEX_DATASOURCE,
MERGE_INDEX_TASK,
MERGE_INDEX_QUERIES_RESOURCE,
false,
true,
true
);
doReindexTest(
MERGE_INDEX_DATASOURCE,
reindexDatasource,
MERGE_REINDEX_TASK,
MERGE_REINDEX_QUERIES_RESOURCE
);
doReindexTest(
MERGE_INDEX_DATASOURCE,
reindexDatasourceWithDruidInputSource,
MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
MERGE_INDEX_QUERIES_RESOURCE
);
}
}
}

View File

@ -113,5 +113,38 @@
"rows" : 1
}
} ]
},
{
"description": "timeseries, stringFirst/stringLast aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "stringFirst",
"name": "first_user",
"fieldName": "user"
},
{
"type":"stringLast",
"name":"last_user",
"fieldName":"user"
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"first_user":"nuclear",
"last_user":"stringer"
}
}
]
}
]

View File

@ -0,0 +1,42 @@
[
{
"description": "groupby, stringFirst/stringLast rollup aggs, all",
"query":{
"queryType" : "groupBy",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"continent"
],
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":{
"type":"selector",
"dimension":"continent",
"value":"Asia"
},
"aggregations":[
{
"type": "stringFirst",
"name": "earliest_user",
"fieldName": "first_user"
},
{
"type":"stringLast",
"name":"latest_user",
"fieldName":"last_user"
}
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"continent":"Asia",
"earliest_user":"masterYi",
"latest_user":"stringer"
}
} ]
}
]

View File

@ -0,0 +1,70 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"type": "stringFirst",
"name": "first_user",
"fieldName": "user"
},
{
"type": "stringLast",
"name": "last_user",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "DAY",
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"continent"
]
}
}
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "local",
"baseDir": "/resources/data/batch_index/json",
"filter": "wikipedia_index_data*"
}
},
"tuningConfig": {
"type": "index",
"maxRowsPerSegment": 5,
"maxRowsInMemory": 2
}
}
}

View File

@ -0,0 +1,63 @@
{
"type": "index",
"spec": {
"ioConfig": {
"type": "index",
"inputSource": {
"type": "druid",
"dataSource": "%%DATASOURCE%%",
"interval": "2013-08-31/2013-09-01"
}
},
"tuningConfig": {
"type": "index",
"partitionsSpec": {
"type": "dynamic"
}
},
"dataSchema": {
"dataSource": "%%REINDEX_DATASOURCE%%",
"granularitySpec": {
"type": "uniform",
"queryGranularity": "DAY",
"segmentGranularity": "DAY"
},
"timestampSpec": {
"column": "__time",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"continent"
]
},
"metricsSpec": [
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"type": "stringFirst",
"name": "first_user",
"fieldName": "first_user"
},
{
"type": "stringLast",
"name": "last_user",
"fieldName": "last_user"
}
]
}
}
}

View File

@ -0,0 +1,65 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%REINDEX_DATASOURCE%%",
"metricsSpec": [
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"type": "stringFirst",
"name": "first_user",
"fieldName": "first_user"
},
{
"type": "stringLast",
"name": "last_user",
"fieldName": "last_user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "DAY",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"continent"
]
}
}
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "ingestSegment",
"dataSource": "%%DATASOURCE%%",
"interval": "2013-08-31/2013-09-01"
}
},
"tuningConfig": {
"type": "index"
}
}
}

View File

@ -20,41 +20,40 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<String>
public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<SerializablePairLongString>
{
private String firstString;
private boolean isReset = false;
private SerializablePairLongString firstValue;
@Override
public void reset(ColumnValueSelector selector)
{
firstString = (String) selector.getObject();
isReset = true;
firstValue = (SerializablePairLongString) selector.getObject();
}
@Override
public void fold(ColumnValueSelector selector)
{
if (!isReset) {
firstString = (String) selector.getObject();
isReset = true;
SerializablePairLongString newValue = (SerializablePairLongString) selector.getObject();
if (StringFirstAggregatorFactory.TIME_COMPARATOR.compare(firstValue, newValue) > 0) {
firstValue = newValue;
}
}
@Nullable
@Override
public String getObject()
public SerializablePairLongString getObject()
{
return firstString;
return firstValue;
}
@Override
public Class<String> classOfObject()
public Class<SerializablePairLongString> classOfObject()
{
return String.class;
return SerializablePairLongString.class;
}
}

View File

@ -20,36 +20,41 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
public class StringLastAggregateCombiner extends ObjectAggregateCombiner<String>
public class StringLastAggregateCombiner extends ObjectAggregateCombiner<SerializablePairLongString>
{
private String lastString;
private SerializablePairLongString lastValue;
@Override
public void reset(ColumnValueSelector selector)
{
lastString = (String) selector.getObject();
lastValue = (SerializablePairLongString) selector.getObject();
}
@Override
public void fold(ColumnValueSelector selector)
{
lastString = (String) selector.getObject();
SerializablePairLongString newValue = (SerializablePairLongString) selector.getObject();
if (StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lastValue, newValue) < 0) {
lastValue = (SerializablePairLongString) selector.getObject();
}
}
@Nullable
@Override
public String getObject()
public SerializablePairLongString getObject()
{
return lastString;
return lastValue;
}
@Override
public Class<String> classOfObject()
public Class<SerializablePairLongString> classOfObject()
{
return String.class;
return SerializablePairLongString.class;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
@ -62,6 +63,7 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
@Before
public void setup()
{
NullHandling.initializeForTests();
stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
combiningAggFactory = stringFirstAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
@ -175,24 +177,23 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
@Test
public void testStringFirstAggregateCombiner()
{
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs);
AggregateCombiner stringFirstAggregateCombiner =
combiningAggFactory.makeAggregateCombiner();
stringFirstAggregateCombiner.reset(columnSelector);
Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject());
columnSelector.increment();
stringFirstAggregateCombiner.fold(columnSelector);
Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject());
stringFirstAggregateCombiner.reset(columnSelector);
Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject());
}
private void aggregate(

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
@ -61,6 +62,7 @@ public class StringLastAggregationTest
@Before
public void setup()
{
NullHandling.initializeForTests();
stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
combiningAggFactory = stringLastAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
@ -159,23 +161,22 @@ public class StringLastAggregationTest
@Test
public void testStringLastAggregateCombiner()
{
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs);
AggregateCombiner stringFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner();
stringFirstAggregateCombiner.reset(columnSelector);
Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject());
columnSelector.increment();
stringFirstAggregateCombiner.fold(columnSelector);
Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject());
stringFirstAggregateCombiner.reset(columnSelector);
Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject());
}
private void aggregate(

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
import org.apache.druid.segment.data.IncrementalIndexTest;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class IndexMergerRollupTest extends InitializedNullHandlingTest
{
private IndexMerger indexMerger;
private IndexIO indexIO;
private IndexSpec indexSpec;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp()
{
indexMerger = TestHelper
.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
indexIO = TestHelper.getTestIndexIO();
indexSpec = new IndexSpec();
}
private void testStringFirstLastRollup(
AggregatorFactory[] aggregatorFactories
) throws Exception
{
List<Map<String, Object>> eventsList = Arrays.asList(
new HashMap<String, Object>()
{
{
put("d", "d1");
put("m", "m1");
}
},
new HashMap<String, Object>()
{
{
put("d", "d1");
put("m", "m2");
}
}
);
final File tempDir = temporaryFolder.newFolder();
List<QueryableIndex> indexes = new ArrayList<>();
Instant time = Instant.now();
for (Map<String, Object> events : eventsList) {
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(aggregatorFactories);
toPersist.add(new MapBasedInputRow(time.toEpochMilli(), ImmutableList.of("d"), events));
indexes.add(indexIO.loadIndex(indexMerger.persist(toPersist, tempDir, indexSpec, null)));
}
File indexFile = indexMerger
.mergeQueryableIndex(indexes, true, aggregatorFactories, tempDir, indexSpec, null);
try (QueryableIndex mergedIndex = indexIO.loadIndex(indexFile)) {
Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows());
}
}
@Test
public void testStringFirstRollup() throws Exception
{
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new StringFirstAggregatorFactory("m", "m", 1024)
};
testStringFirstLastRollup(aggregatorFactories);
}
@Test
public void testStringLastRollup() throws Exception
{
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new StringLastAggregatorFactory("m", "m", 1024)
};
testStringFirstLastRollup(aggregatorFactories);
}
}

View File

@ -28,9 +28,12 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.util.Optionality;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -85,6 +88,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
case DOUBLE:
return new DoubleFirstAggregatorFactory(name, fieldName);
case STRING:
case COMPLEX:
return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes);
default:
throw new ISE("Cannot build EARLIEST aggregatorFactory for type[%s]", type);
@ -104,6 +108,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
case DOUBLE:
return new DoubleLastAggregatorFactory(name, fieldName);
case STRING:
case COMPLEX:
return new StringLastAggregatorFactory(name, fieldName, maxStringBytes);
default:
throw new ISE("Cannot build LATEST aggregatorFactory for type[%s]", type);
@ -219,22 +224,48 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
);
}
static class EarliestLatestReturnTypeInference implements SqlReturnTypeInference
{
private final int ordinal;
public EarliestLatestReturnTypeInference(int ordinal)
{
this.ordinal = ordinal;
}
@Override
public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
{
RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal);
// For non-number and non-string type, which is COMPLEX type, we set the return type to VARCHAR.
if (!SqlTypeUtil.isNumeric(type) &&
!SqlTypeUtil.isString(type)) {
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
} else {
return type;
}
}
}
private static class EarliestLatestSqlAggFunction extends SqlAggFunction
{
private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
new EarliestLatestReturnTypeInference(0);
EarliestLatestSqlAggFunction(AggregatorType aggregatorType)
{
super(
aggregatorType.name(),
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.ARG0,
EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE,
InferTypes.RETURN_TYPE,
OperandTypes.or(
OperandTypes.NUMERIC,
OperandTypes.BOOLEAN,
OperandTypes.sequence(
"'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n",
OperandTypes.STRING,
OperandTypes.ANY,
OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
)
),