diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 95b16dee3d7..a602d4a5aa0 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -45,6 +45,15 @@ public class ITIndexerTest extends AbstractITBatchIndexTest private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json"; private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test"; + private static final String MERGE_INDEX_TASK = "/indexer/wikipedia_merge_index_task.json"; + private static final String MERGE_INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json"; + private static final String MERGE_INDEX_DATASOURCE = "wikipedia_merge_index_test"; + + private static final String MERGE_REINDEX_TASK = "/indexer/wikipedia_merge_reindex_task.json"; + private static final String MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_merge_reindex_druid_input_source_task.json"; + private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json"; + private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test"; + @Test public void testIndexData() throws Exception { @@ -110,4 +119,37 @@ public class ITIndexerTest extends AbstractITBatchIndexTest ); } } + + @Test + public void testMERGEIndexData() throws Exception + { + final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData"; + final String reindexDatasourceWithDruidInputSource = MERGE_REINDEX_DATASOURCE + "-testMergeReIndexData-druidInputSource"; + try ( + final Closeable ignored1 = unloader(MERGE_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix()); + final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix()) + ) { + doIndexTest( + MERGE_INDEX_DATASOURCE, + MERGE_INDEX_TASK, + MERGE_INDEX_QUERIES_RESOURCE, + false, + true, + true + ); + doReindexTest( + MERGE_INDEX_DATASOURCE, + reindexDatasource, + MERGE_REINDEX_TASK, + MERGE_REINDEX_QUERIES_RESOURCE + ); + doReindexTest( + MERGE_INDEX_DATASOURCE, + reindexDatasourceWithDruidInputSource, + MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE, + MERGE_INDEX_QUERIES_RESOURCE + ); + } + } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json index bf2a70b687a..928effe65e9 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json @@ -113,5 +113,38 @@ "rows" : 1 } } ] + }, + { + "description": "timeseries, stringFirst/stringLast aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "stringFirst", + "name": "first_user", + "fieldName": "user" + }, + { + "type":"stringLast", + "name":"last_user", + "fieldName":"user" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "first_user":"nuclear", + "last_user":"stringer" + } + } + ] } ] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json new file mode 100644 index 00000000000..ab4674999b5 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json @@ -0,0 +1,42 @@ +[ + { + "description": "groupby, stringFirst/stringLast rollup aggs, all", + "query":{ + "queryType" : "groupBy", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "continent" + ], + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":{ + "type":"selector", + "dimension":"continent", + "value":"Asia" + }, + "aggregations":[ + { + "type": "stringFirst", + "name": "earliest_user", + "fieldName": "first_user" + }, + { + "type":"stringLast", + "name":"latest_user", + "fieldName":"last_user" + } + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "continent":"Asia", + "earliest_user":"masterYi", + "latest_user":"stringer" + } + } ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json new file mode 100644 index 00000000000..43264a8c675 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json @@ -0,0 +1,70 @@ +{ + "type": "index", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "type": "stringFirst", + "name": "first_user", + "fieldName": "user" + }, + { + "type": "stringLast", + "name": "last_user", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "DAY", + "intervals" : [ "2013-08-31/2013-09-02" ] + }, + "parser": { + "parseSpec": { + "format" : "json", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + "continent" + ] + } + } + } + }, + "ioConfig": { + "type": "index", + "firehose": { + "type": "local", + "baseDir": "/resources/data/batch_index/json", + "filter": "wikipedia_index_data*" + } + }, + "tuningConfig": { + "type": "index", + "maxRowsPerSegment": 5, + "maxRowsInMemory": 2 + } + } +} diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json new file mode 100644 index 00000000000..9daae62c8d4 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json @@ -0,0 +1,63 @@ +{ + "type": "index", + "spec": { + "ioConfig": { + "type": "index", + "inputSource": { + "type": "druid", + "dataSource": "%%DATASOURCE%%", + "interval": "2013-08-31/2013-09-01" + } + }, + "tuningConfig": { + "type": "index", + "partitionsSpec": { + "type": "dynamic" + } + }, + "dataSchema": { + "dataSource": "%%REINDEX_DATASOURCE%%", + "granularitySpec": { + "type": "uniform", + "queryGranularity": "DAY", + "segmentGranularity": "DAY" + }, + "timestampSpec": { + "column": "__time", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "continent" + ] + }, + "metricsSpec": [ + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "type": "stringFirst", + "name": "first_user", + "fieldName": "first_user" + }, + { + "type": "stringLast", + "name": "last_user", + "fieldName": "last_user" + } + ] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json new file mode 100644 index 00000000000..127461dd117 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json @@ -0,0 +1,65 @@ +{ + "type": "index", + "spec": { + "dataSchema": { + "dataSource": "%%REINDEX_DATASOURCE%%", + "metricsSpec": [ + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "type": "stringFirst", + "name": "first_user", + "fieldName": "first_user" + }, + { + "type": "stringLast", + "name": "last_user", + "fieldName": "last_user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "DAY", + "intervals" : [ "2013-08-31/2013-09-01" ] + }, + "parser": { + "parseSpec": { + "format" : "json", + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "continent" + ] + } + } + } + }, + "ioConfig": { + "type": "index", + "firehose": { + "type": "ingestSegment", + "dataSource": "%%DATASOURCE%%", + "interval": "2013-08-31/2013-09-01" + } + }, + "tuningConfig": { + "type": "index" + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java index 9a86a13be33..4ccf92e2b38 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java @@ -20,41 +20,40 @@ package org.apache.druid.query.aggregation.first; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.segment.ColumnValueSelector; import javax.annotation.Nullable; -public class StringFirstAggregateCombiner extends ObjectAggregateCombiner +public class StringFirstAggregateCombiner extends ObjectAggregateCombiner { - private String firstString; - private boolean isReset = false; + private SerializablePairLongString firstValue; @Override public void reset(ColumnValueSelector selector) { - firstString = (String) selector.getObject(); - isReset = true; + firstValue = (SerializablePairLongString) selector.getObject(); } @Override public void fold(ColumnValueSelector selector) { - if (!isReset) { - firstString = (String) selector.getObject(); - isReset = true; + SerializablePairLongString newValue = (SerializablePairLongString) selector.getObject(); + if (StringFirstAggregatorFactory.TIME_COMPARATOR.compare(firstValue, newValue) > 0) { + firstValue = newValue; } } @Nullable @Override - public String getObject() + public SerializablePairLongString getObject() { - return firstString; + return firstValue; } @Override - public Class classOfObject() + public Class classOfObject() { - return String.class; + return SerializablePairLongString.class; } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java index e6ea2ff074d..47f21190395 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java @@ -20,36 +20,41 @@ package org.apache.druid.query.aggregation.last; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.segment.ColumnValueSelector; import javax.annotation.Nullable; -public class StringLastAggregateCombiner extends ObjectAggregateCombiner +public class StringLastAggregateCombiner extends ObjectAggregateCombiner { - private String lastString; + private SerializablePairLongString lastValue; @Override public void reset(ColumnValueSelector selector) { - lastString = (String) selector.getObject(); + lastValue = (SerializablePairLongString) selector.getObject(); } @Override public void fold(ColumnValueSelector selector) { - lastString = (String) selector.getObject(); + SerializablePairLongString newValue = (SerializablePairLongString) selector.getObject(); + if (StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lastValue, newValue) < 0) { + lastValue = (SerializablePairLongString) selector.getObject(); + } } @Nullable @Override - public String getObject() + public SerializablePairLongString getObject() { - return lastString; + return lastValue; } @Override - public Class classOfObject() + public Class classOfObject() { - return String.class; + return SerializablePairLongString.class; } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java index 039054da111..854cdbf9b1f 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.first; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; @@ -62,6 +63,7 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest @Before public void setup() { + NullHandling.initializeForTests(); stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); combiningAggFactory = stringFirstAggFactory.getCombiningFactory(); timeSelector = new TestLongColumnSelector(times); @@ -175,24 +177,23 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest @Test public void testStringFirstAggregateCombiner() { - final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; - TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings); + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); AggregateCombiner stringFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); stringFirstAggregateCombiner.reset(columnSelector); - Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject()); + Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject()); columnSelector.increment(); stringFirstAggregateCombiner.fold(columnSelector); - Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject()); + Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject()); stringFirstAggregateCombiner.reset(columnSelector); - Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject()); + Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject()); } private void aggregate( diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java index 39f9925606e..dbd2522634e 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.last; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; @@ -61,6 +62,7 @@ public class StringLastAggregationTest @Before public void setup() { + NullHandling.initializeForTests(); stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); combiningAggFactory = stringLastAggFactory.getCombiningFactory(); timeSelector = new TestLongColumnSelector(times); @@ -159,23 +161,22 @@ public class StringLastAggregationTest @Test public void testStringLastAggregateCombiner() { - final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; - TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings); + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); AggregateCombiner stringFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); stringFirstAggregateCombiner.reset(columnSelector); - Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject()); + Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject()); columnSelector.increment(); stringFirstAggregateCombiner.fold(columnSelector); - Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject()); + Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject()); stringFirstAggregateCombiner.reset(columnSelector); - Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject()); + Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject()); } private void aggregate( diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java new file mode 100644 index 00000000000..547abcbcfca --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; +import org.apache.druid.segment.data.IncrementalIndexTest; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IndexMergerRollupTest extends InitializedNullHandlingTest +{ + + private IndexMerger indexMerger; + private IndexIO indexIO; + private IndexSpec indexSpec; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void setUp() + { + indexMerger = TestHelper + .getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance()); + indexIO = TestHelper.getTestIndexIO(); + indexSpec = new IndexSpec(); + } + + private void testStringFirstLastRollup( + AggregatorFactory[] aggregatorFactories + ) throws Exception + { + List> eventsList = Arrays.asList( + new HashMap() + { + { + put("d", "d1"); + put("m", "m1"); + } + }, + new HashMap() + { + { + put("d", "d1"); + put("m", "m2"); + } + } + ); + + final File tempDir = temporaryFolder.newFolder(); + + List indexes = new ArrayList<>(); + Instant time = Instant.now(); + + for (Map events : eventsList) { + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(aggregatorFactories); + + toPersist.add(new MapBasedInputRow(time.toEpochMilli(), ImmutableList.of("d"), events)); + indexes.add(indexIO.loadIndex(indexMerger.persist(toPersist, tempDir, indexSpec, null))); + } + + File indexFile = indexMerger + .mergeQueryableIndex(indexes, true, aggregatorFactories, tempDir, indexSpec, null); + try (QueryableIndex mergedIndex = indexIO.loadIndex(indexFile)) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testStringFirstRollup() throws Exception + { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new StringFirstAggregatorFactory("m", "m", 1024) + }; + testStringFirstLastRollup(aggregatorFactories); + } + + @Test + public void testStringLastRollup() throws Exception + { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new StringLastAggregatorFactory("m", "m", 1024) + }; + testStringFirstLastRollup(aggregatorFactories); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index 800ee7d0c26..8ec917e1688 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -28,9 +28,12 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.util.Optionality; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -85,6 +88,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator case DOUBLE: return new DoubleFirstAggregatorFactory(name, fieldName); case STRING: + case COMPLEX: return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes); default: throw new ISE("Cannot build EARLIEST aggregatorFactory for type[%s]", type); @@ -104,6 +108,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator case DOUBLE: return new DoubleLastAggregatorFactory(name, fieldName); case STRING: + case COMPLEX: return new StringLastAggregatorFactory(name, fieldName, maxStringBytes); default: throw new ISE("Cannot build LATEST aggregatorFactory for type[%s]", type); @@ -219,22 +224,48 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator ); } + static class EarliestLatestReturnTypeInference implements SqlReturnTypeInference + { + private final int ordinal; + + public EarliestLatestReturnTypeInference(int ordinal) + { + this.ordinal = ordinal; + } + + @Override + public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding) + { + RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal); + // For non-number and non-string type, which is COMPLEX type, we set the return type to VARCHAR. + if (!SqlTypeUtil.isNumeric(type) && + !SqlTypeUtil.isString(type)) { + return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR); + } else { + return type; + } + } + } + private static class EarliestLatestSqlAggFunction extends SqlAggFunction { + private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE = + new EarliestLatestReturnTypeInference(0); + EarliestLatestSqlAggFunction(AggregatorType aggregatorType) { super( aggregatorType.name(), null, SqlKind.OTHER_FUNCTION, - ReturnTypes.ARG0, + EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE, InferTypes.RETURN_TYPE, OperandTypes.or( OperandTypes.NUMERIC, OperandTypes.BOOLEAN, OperandTypes.sequence( "'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n", - OperandTypes.STRING, + OperandTypes.ANY, OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL) ) ),