Updating segment map function for QueryDataSource to ensure group by … (#14112)

* Updating segment map function for QueryDataSource to ensure group by of group by of join data source gets into proper segment map function path

* Adding unit tests for the failed case

* There you go coverage bot, be happy now
This commit is contained in:
Soumyava 2023-04-20 13:22:29 -07:00 committed by GitHub
parent e7ae825e0c
commit 8d60edcfcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 1 deletions

View File

@ -98,7 +98,8 @@ public class QueryDataSource implements DataSource
AtomicLong cpuTime
)
{
return Function.identity();
final Query<?> subQuery = this.getQuery();
return subQuery.getDataSource().createSegmentMapFunction(subQuery, cpuTime);
}
@Override

View File

@ -22,7 +22,9 @@ package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
@ -30,6 +32,8 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class QueryDataSourceTest
{
@ -50,9 +54,19 @@ public class QueryDataSourceTest
.granularity(Granularities.ALL)
.build();
private final QueryDataSource queryOnTableDataSource = new QueryDataSource(queryOnTable);
private final QueryDataSource queryOnLookupDataSource = new QueryDataSource(queryOnLookup);
private final GroupByQuery groupByQuery = new GroupByQuery.Builder()
.setDataSource(queryOnTableDataSource)
.setGranularity(Granularities.ALL)
.setInterval("2000/3000")
.build();
private final QueryDataSource queryDataSource = new QueryDataSource(groupByQuery);
@Test
public void test_getTableNames_table()
{
@ -155,4 +169,22 @@ public class QueryDataSourceTest
Assert.assertEquals(queryOnTableDataSource, deserialized);
}
@Test
public void test_withSegmentMapFunction()
{
Function<SegmentReference, SegmentReference> parentsegmentMapFunction = queryDataSource.createSegmentMapFunction(
groupByQuery,
new AtomicLong()
);
Function<SegmentReference, SegmentReference> childsegmentMapFunction = queryOnTableDataSource.createSegmentMapFunction(
groupByQuery,
new AtomicLong()
);
// The segment functions should both be identity functions and equal
Assert.assertEquals(parentsegmentMapFunction, childsegmentMapFunction);
}
}

View File

@ -3143,6 +3143,90 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testGroupByOverGroupByOverInnerJoinOnTwoInlineDataSources(Map<String, Object> queryContext)
{
skipVectorize();
cannotVectorize();
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT dim1 from (SELECT dim1,__time FROM (SELECT t1.dim1, t1.\"__time\" from abc as t1 INNER JOIN abc as t2 on t1.dim1 = t2.dim1) GROUP BY 1,2) GROUP BY dim1\n",
queryContext,
ImmutableList.of(
new GroupByQuery.Builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn(
"v0",
"\'10.1\'",
ColumnType.STRING
))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(
makeColumnExpression("v0"),
makeColumnExpression("j0.dim1")
),
JoinType.INNER
))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setVirtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec(
"_v0",
"d0",
ColumnType.STRING
), new DefaultDimensionSpec(
"__time",
"d1",
ColumnType.LONG
))
.setContext(queryContext)
.build()
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimensions(new DefaultDimensionSpec(
"d0",
"_d0",
ColumnType.STRING
))
.setContext(queryContext)
.setGranularity(Granularities.ALL)
.build()
),
ImmutableList.of(
new Object[]{"10.1"}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinOnTwoInlineDataSources_withLeftDirectAccess(Map<String, Object> queryContext)