Regression bug fix where ever LimitFrameProcessor's were used. (#13941)

This commit is contained in:
Karan Kumar 2023-03-16 21:48:18 +05:30 committed by GitHub
parent 67df1324ee
commit bf13156b55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 239 additions and 7 deletions

View File

@ -80,6 +80,6 @@ public class MixShuffleSpec implements ShuffleSpec
@Override
public String toString()
{
return "MuxShuffleSpec{}";
return "MixShuffleSpec{}";
}
}

View File

@ -27,7 +27,6 @@ import org.apache.druid.frame.key.ClusterBy;
* Describes how outputs of a stage are shuffled. Property of {@link StageDefinition}.
*
* When the output of a stage is shuffled, it is globally sorted and partitioned according to the {@link ClusterBy}.
* Hash-based (non-sorting) shuffle is not currently implemented.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {

View File

@ -883,6 +883,9 @@ class ControllerStageTracker
workerCount,
resultPartitionBoundaries.size()
);
} else if (shuffleSpec.kind() == ShuffleKind.MIX) {
resultPartitionBoundaries = ClusterByPartitions.oneUniversalPartition();
resultPartitions = ReadablePartitions.striped(stageNumber, workerCount, shuffleSpec.partitionCount());
} else {
resultPartitions = ReadablePartitions.striped(stageNumber, workerCount, shuffleSpec.partitionCount());
}

View File

@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
@ -158,13 +157,12 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
if (doLimitOrOffset) {
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
.shuffleSpec(MixShuffleSpec.instance())
.shuffleSpec(null) // no shuffling should be required after a limit processor.
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),

View File

@ -161,7 +161,7 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
.inputs(new StageInputSpec(firstStageNumber))
.signature(signatureToUse)
.maxWorkerCount(1)
.shuffleSpec(MixShuffleSpec.instance())
.shuffleSpec(null) // no shuffling should be required after a limit processor.
.processorFactory(
new OffsetLimitFrameProcessorFactory(
queryToRun.getScanRowsOffset(),

View File

@ -192,6 +192,105 @@ public class MSQInsertTest extends MSQTestBase
}
@Test
public void testInsertOnFoo1WithGroupByLimitWithoutClusterBy()
{
List<Object[]> expectedRows = expectedFooRows();
int expectedCounterRows = expectedRows.size();
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 limit 10 PARTITIONED by All")
.setExpectedDataSource("foo1")
.setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(expectedRows)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
2, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
3, 0, "input0"
)
.verifyResults();
}
@Test
public void testInsertOnFoo1WithGroupByLimitWithClusterBy()
{
List<Object[]> expectedRows = expectedFooRows();
int expectedCounterRows = expectedRows.size();
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 limit 10 PARTITIONED by All clustered by 2,3")
.setExpectedDataSource("foo1")
.setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(expectedRows)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
2, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
3, 0, "input0"
)
.verifyResults();
}
@Test
public void testInsertOnFoo1WithTimeFunction()
{
@ -284,6 +383,40 @@ public class MSQInsertTest extends MSQTestBase
.verifyResults();
}
@Test
public void testInsertOnFoo1WithLimitWithoutClusterBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim3", ColumnType.STRING).build();
testIngestQuery().setSql(
"INSERT INTO foo1 SELECT dim3 FROM foo WHERE dim3 IS NOT NULL limit 10 PARTITIONED BY ALL TIME")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(expectedMultiValueFooRows())
.verifyResults();
}
@Test
public void testInsertOnFoo1WithLimitWithClusterBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim3", ColumnType.STRING).build();
testIngestQuery().setSql(
"INSERT INTO foo1 SELECT dim3 FROM foo WHERE dim3 IS NOT NULL limit 10 PARTITIONED BY ALL TIME clustered by dim3")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(expectedMultiValueFooRows())
.verifyResults();
}
@Test
public void testInsertOnFoo1WithMultiValueDimGroupBy()
{

View File

@ -371,6 +371,102 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
@Test
public void testSelectWithLimit()
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();
testSelectQuery()
.setSql("select cnt,dim1 from foo limit 10")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("cnt", "dim1")
.context(defaultScanQueryContext(context, resultSignature))
.limit(10)
.build()
)
.columnMappings(ColumnMappings.identity(resultSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build()
)
.setQueryContext(context)
.setExpectedRowSignature(resultSignature)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, !useDefault ? "" : null},
new Object[]{1L, "10.1"},
new Object[]{1L, "2"},
new Object[]{1L, "1"},
new Object[]{1L, "def"},
new Object[]{1L, "abc"}
)).verifyResults();
}
@Test
public void testSelectWithGroupByLimit()
{
RowSignature rowSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("cnt1", ColumnType.LONG)
.build();
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo group by cnt limit 10")
.setQueryContext(context)
.setExpectedMSQSpec(MSQSpec.builder()
.query(GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration
.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec(
"cnt",
"d0",
ColumnType.LONG
)
))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory(
"a0")))
.setContext(context)
.setLimit(10)
.build())
.columnMappings(
new ColumnMappings(ImmutableList.of(
new ColumnMapping("d0", "cnt"),
new ColumnMapping("a0", "cnt1")
)
))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1L, 6L}))
.verifyResults();
}
@Test
public void testSubquery()
{

View File

@ -942,7 +942,10 @@ public class MSQTestBase extends BaseCalciteQueryTest
worker,
channel
);
Assert.assertTrue(channelToCounters.containsKey(channel));
Assert.assertTrue(StringUtils.format("Counters not found for stage [%d], worker [%d], channel [%s]",
stage,
worker,
channel), channelToCounters.containsKey(channel));
counter.matchQuerySnapshot(errorMessageFormat, channelToCounters.get(channel));
}
);