Add sql + ingestion compatibility for first/last on numeric values (#15607)

SQL compatibility for numeric last and first column types.
Ingestion UI now provides option for first and last aggregation as well.
This commit is contained in:
Ankit Kothari 2024-01-09 23:29:38 -08:00 committed by GitHub
parent 047c7340ab
commit 355c2f5da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 204 additions and 10 deletions

View File

@ -311,6 +311,9 @@ public class CalciteMSQTestsHelper
.inputTmpDir(temporaryFolder.newFolder())
.buildMMappedIndex();
break;
case CalciteTests.WIKIPEDIA_FIRST_LAST:
index = TestDataBuilder.makeWikipediaIndexWithAggregation(temporaryFolder.newFolder());
break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);

View File

@ -44,6 +44,9 @@ import org.apache.calcite.util.Optionality;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde;
import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde;
import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
@ -68,6 +71,7 @@ import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -316,6 +320,25 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
{
RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal);
// If complex and of type SerializablePairLong*, return scalar type
if (type instanceof RowSignatures.ComplexSqlType) {
ColumnType complexColumnType = ((RowSignatures.ComplexSqlType) type).getColumnType();
String complexTypeName = complexColumnType.getComplexTypeName();
if (complexTypeName != null) {
switch (complexTypeName) {
case SerializablePairLongLongComplexMetricSerde.TYPE_NAME:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.FLOAT);
case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
default:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
}
}
}
// For non-number and non-string type, which is COMPLEX type, we set the return type to VARCHAR.
if (!SqlTypeUtil.isNumeric(type) &&
!SqlTypeUtil.isString(type)) {

View File

@ -581,7 +581,13 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "wikipedia"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
@ -661,7 +667,13 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "wikipedia"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)

View File

@ -179,7 +179,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
@ -217,7 +218,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
@ -1075,6 +1077,75 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testNumericLatestEarliestGroupBy()
{
testQuery(
"SELECT isNew, LATEST(long_last_added), EARLIEST(long_first_added), LATEST(float_last_added), EARLIEST(float_first_added), LATEST(double_last_added), EARLIEST(double_first_added) FROM wikipedia_first_last GROUP BY isNew",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("isNew", "d0")))
.setAggregatorSpecs(aggregators(
new LongLastAggregatorFactory("a0", "long_last_added", null),
new LongFirstAggregatorFactory("a1", "long_first_added", null),
new FloatLastAggregatorFactory("a2", "float_last_added", null),
new FloatFirstAggregatorFactory("a3", "float_first_added", null),
new DoubleLastAggregatorFactory("a4", "double_last_added", null),
new DoubleFirstAggregatorFactory("a5", "double_first_added", null)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"false", 182L, 36L, 182.0F, 36.0F, 182.0D, 36.0D},
new Object[]{"true", 113L, 345L, 113.0F, 345.0F, 113.0D, 345.0D}
)
);
}
@Test
public void testNumericLatestEarliestWithOpratorsGroupBy()
{
testQuery(
"SELECT isNew, LATEST(long_last_added)+4, EARLIEST(long_first_added)-4, LATEST(float_last_added)*2, EARLIEST(float_first_added)/2f, LATEST(double_last_added)+2.5, EARLIEST(double_first_added)-2.5 FROM wikipedia_first_last GROUP BY isNew",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("isNew", "d0")))
.setAggregatorSpecs(aggregators(
new LongLastAggregatorFactory("a0", "long_last_added", null),
new LongFirstAggregatorFactory("a1", "long_first_added", null),
new FloatLastAggregatorFactory("a2", "float_last_added", null),
new FloatFirstAggregatorFactory("a3", "float_first_added", null),
new DoubleLastAggregatorFactory("a4", "double_last_added", null),
new DoubleFirstAggregatorFactory("a5", "double_first_added", null)
)
)
.setPostAggregatorSpecs(
expressionPostAgg("p0", "(\"a0\" + 4)", ColumnType.LONG),
expressionPostAgg("p1", "(\"a1\" - 4)", ColumnType.LONG),
expressionPostAgg("p2", "(\"a2\" * 2)", ColumnType.FLOAT),
expressionPostAgg("p3", "(\"a3\" / 2)", ColumnType.FLOAT),
expressionPostAgg("p4", "(\"a4\" + 2.5)", ColumnType.DOUBLE),
expressionPostAgg("p5", "(\"a5\" - 2.5)", ColumnType.DOUBLE)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"false", 186L, 32L, 364.0F, 18.0F, 184.5D, 33.5D},
new Object[]{"true", 117L, 341L, 226.0F, 172.5F, 115.5D, 342.5D}
)
);
}
@Test
public void testStringLatestGroupByWithAlwaysFalseCondition()
{

View File

@ -647,7 +647,7 @@ public class CalciteSimpleQueryTest extends BaseCalciteQueryTest
.expectedQueries(
ImmutableList.of(
GroupByQuery.builder()
.setDataSource("wikipedia")
.setDataSource(CalciteTests.WIKIPEDIA)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(

View File

@ -117,6 +117,8 @@ public class CalciteTests
public static final String SOMEXDATASOURCE = "somexdatasource";
public static final String USERVISITDATASOURCE = "visits";
public static final String DRUID_SCHEMA_NAME = "druid";
public static final String WIKIPEDIA = "wikipedia";
public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last";
public static final String TEST_SUPERUSER_NAME = "testSuperuser";
public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new AuthorizerMapper(null)

View File

@ -47,7 +47,12 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
@ -639,6 +644,57 @@ public class TestDataBuilder
.buildMMappedIndex();
}
public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir)
{
final List<DimensionSchema> dimensions = Arrays.asList(
new StringDimensionSchema("channel"),
new StringDimensionSchema("cityName"),
new StringDimensionSchema("comment"),
new StringDimensionSchema("countryIsoCode"),
new StringDimensionSchema("countryName"),
new StringDimensionSchema("isAnonymous"),
new StringDimensionSchema("isMinor"),
new StringDimensionSchema("isNew"),
new StringDimensionSchema("isRobot"),
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
);
return IndexBuilder
.create()
.tmpDir(new File(tmpDir, "wikipedia1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(new IncrementalIndexSchema.Builder()
.withRollup(true)
.withTimestampSpec(new TimestampSpec("time", null, null))
.withDimensionsSpec(new DimensionsSpec(dimensions))
.withMetrics(
new LongLastAggregatorFactory("long_last_added", "added", "__time"),
new LongFirstAggregatorFactory("long_first_added", "added", "__time"),
new FloatLastAggregatorFactory("float_last_added", "added", "__time"),
new FloatLastAggregatorFactory("float_first_added", "added", "__time"),
new DoubleLastAggregatorFactory("double_last_added", "added", "__time"),
new DoubleFirstAggregatorFactory("double_first_added", "added", "__time")
)
.build()
)
.inputSource(
ResourceInputSource.of(
TestDataBuilder.class.getClassLoader(),
"calcite/tests/wikiticker-2015-09-12-sampled.json.gz"
)
)
.inputFormat(DEFAULT_JSON_INPUT_FORMAT)
.inputTmpDir(new File(tmpDir, "tmpWikipedia1"))
.buildMMappedIndex();
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final Injector injector,
final QueryRunnerFactoryConglomerate conglomerate,
@ -873,13 +929,22 @@ public class TestDataBuilder
userVisitIndex
).add(
DataSegment.builder()
.dataSource("wikipedia")
.dataSource(CalciteTests.WIKIPEDIA)
.interval(Intervals.of("2015-09-12/2015-09-13"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
makeWikipediaIndex(tmpDir)
).add(
DataSegment.builder()
.dataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.interval(Intervals.of("2015-09-12/2015-09-13"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
makeWikipediaIndexWithAggregation(tmpDir)
);
}

View File

@ -59,6 +59,12 @@ const KNOWN_TYPES = [
'longMax',
'doubleMax',
'floatMax',
'longFirst',
'longLast',
'doubleFirst',
'doubleLast',
'floatFirst',
'floatLast',
'stringFirst',
'stringLast',
'thetaSketch',
@ -97,10 +103,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
group: 'max',
suggestions: ['longMax', 'doubleMax', 'floatMax'],
},
// Do not show first and last aggregators as they can not be used in ingestion specs and this definition is only used in the data loader.
// Ref: https://druid.apache.org/docs/latest/querying/aggregations.html#first--last-aggregator
// Should the first / last aggregators become usable at ingestion time, reverse the changes made in:
// https://github.com/apache/druid/pull/10794
{
group: 'first',
suggestions: ['longFirst', 'doubleFirst', 'floatFirst', 'stringFirst'],
},
{
group: 'last',
suggestions: ['longLast', 'doubleLast', 'floatLast', 'stringLast'],
},
'thetaSketch',
'arrayOfDoublesSketch',
{
@ -129,6 +139,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
'longMax',
'doubleMax',
'floatMax',
'longFirst',
'longLast',
'doubleFirst',
'doubleLast',
'floatFirst',
'floatLast',
'stringFirst',
'stringLast',
'thetaSketch',
'arrayOfDoublesSketch',
'HLLSketchBuild',