mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
fixes for inline subqueries when multi-value dimension is present (#9698)
* fixes for inline subqueries when multi-value dimension is present * fix test * allow missing capabilities for vectorized group by queries to be treated as single dims since it means that column doesnt exist * add comment
This commit is contained in:
parent
28f56978ab
commit
68cc0b2e1c
@ -227,7 +227,7 @@ public class GroupByQueryEngineV2
|
||||
processingBuffer,
|
||||
fudgeTimestamp,
|
||||
dims,
|
||||
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions()),
|
||||
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false),
|
||||
cardinalityForArrayAggregation
|
||||
);
|
||||
} else {
|
||||
@ -238,7 +238,7 @@ public class GroupByQueryEngineV2
|
||||
processingBuffer,
|
||||
fudgeTimestamp,
|
||||
dims,
|
||||
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions())
|
||||
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false)
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -313,12 +313,15 @@ public class GroupByQueryEngineV2
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether all "dimensions" are either single-valued or nonexistent (which is just as good as single-valued,
|
||||
* since their selectors will show up as full of nulls).
|
||||
* Checks whether all "dimensions" are either single-valued, or if allowed, nonexistent. Since non-existent column
|
||||
* selectors will show up as full of nulls they are effectively single valued, however they can also be null during
|
||||
* broker merge, for example with an 'inline' datasource subquery. 'missingMeansNonexistent' is sort of a hack to let
|
||||
* the vectorized engine, which only operates on actual segments, to still work in this case for non-existent columns.
|
||||
*/
|
||||
public static boolean isAllSingleValueDims(
|
||||
final Function<String, ColumnCapabilities> capabilitiesFunction,
|
||||
final List<DimensionSpec> dimensions
|
||||
final List<DimensionSpec> dimensions,
|
||||
final boolean missingMeansNonexistent
|
||||
)
|
||||
{
|
||||
return dimensions
|
||||
@ -333,7 +336,8 @@ public class GroupByQueryEngineV2
|
||||
|
||||
// Now check column capabilities.
|
||||
final ColumnCapabilities columnCapabilities = capabilitiesFunction.apply(dimension.getDimension());
|
||||
return columnCapabilities == null || !columnCapabilities.hasMultipleValues();
|
||||
return (columnCapabilities != null && !columnCapabilities.hasMultipleValues()) ||
|
||||
(missingMeansNonexistent && columnCapabilities == null);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -83,7 +83,7 @@ public class VectorGroupByEngine
|
||||
// This situation should sort itself out pretty well once this engine supports multi-valued columns. Then we
|
||||
// won't have to worry about having this all-single-value-dims check here.
|
||||
|
||||
return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions())
|
||||
return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions(), true)
|
||||
&& query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
|
||||
&& query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
|
||||
&& adapter.canVectorize(filter, query.getVirtualColumns(), false);
|
||||
|
@ -132,9 +132,10 @@ public class TopNQueryEngine
|
||||
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
|
||||
} else if (selector.isHasExtractionFn()) {
|
||||
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
|
||||
} else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING
|
||||
} else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING
|
||||
&& columnCapabilities.isDictionaryEncoded())) {
|
||||
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings.
|
||||
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know
|
||||
// which can happen for 'inline' data sources when this is run on the broker
|
||||
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
|
||||
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
|
||||
// Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
|
||||
|
@ -56,7 +56,11 @@ import org.apache.druid.query.groupby.GroupByQuery;
|
||||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||
import org.apache.druid.query.groupby.GroupByQueryHelper;
|
||||
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
|
||||
import org.apache.druid.query.scan.ScanQuery;
|
||||
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import org.apache.druid.query.timeseries.TimeseriesQuery;
|
||||
import org.apache.druid.query.topn.TopNQuery;
|
||||
import org.apache.druid.query.topn.TopNQueryBuilder;
|
||||
import org.apache.druid.segment.InlineSegmentWrangler;
|
||||
import org.apache.druid.segment.MapSegmentWrangler;
|
||||
import org.apache.druid.segment.ReferenceCountingSegment;
|
||||
@ -107,6 +111,7 @@ public class ClientQuerySegmentWalkerTest
|
||||
|
||||
private static final String FOO = "foo";
|
||||
private static final String BAR = "bar";
|
||||
private static final String MULTI = "multi";
|
||||
|
||||
private static final Interval INTERVAL = Intervals.of("2000/P1Y");
|
||||
private static final String VERSION = "A";
|
||||
@ -140,6 +145,20 @@ public class ClientQuerySegmentWalkerTest
|
||||
.build()
|
||||
);
|
||||
|
||||
private static final InlineDataSource MULTI_VALUE_INLINE = InlineDataSource.fromIterable(
|
||||
ImmutableList.<Object[]>builder()
|
||||
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "b"), 1})
|
||||
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "c"), 2})
|
||||
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("b"), 3})
|
||||
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("c"), 4})
|
||||
.build(),
|
||||
RowSignature.builder()
|
||||
.addTimeColumn()
|
||||
.add("s", ValueType.STRING)
|
||||
.add("n", ValueType.LONG)
|
||||
.build()
|
||||
);
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@ -399,6 +418,115 @@ public class ClientQuerySegmentWalkerTest
|
||||
Assert.assertEquals(2, scheduler.getTotalReleased().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByOnScanMultiValue()
|
||||
{
|
||||
ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
|
||||
.columns("s", "n")
|
||||
.intervals(
|
||||
new MultipleIntervalSegmentSpec(
|
||||
ImmutableList.of(Intervals.ETERNITY)
|
||||
)
|
||||
)
|
||||
.legacy(false)
|
||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
|
||||
.build();
|
||||
final GroupByQuery query =
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(new QueryDataSource(subquery))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setInterval(Intervals.ONLY_ETERNITY)
|
||||
.setDimensions(DefaultDimensionSpec.of("s"))
|
||||
.setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n"))
|
||||
.build();
|
||||
|
||||
testQuery(
|
||||
query,
|
||||
// GroupBy handles its own subqueries; only the inner one will go to the cluster.
|
||||
ImmutableList.of(
|
||||
ExpectedQuery.cluster(subquery),
|
||||
ExpectedQuery.local(
|
||||
query.withDataSource(
|
||||
InlineDataSource.fromIterable(
|
||||
ImmutableList.of(
|
||||
new Object[]{ImmutableList.of("a", "b"), 1},
|
||||
new Object[]{ImmutableList.of("a", "c"), 2},
|
||||
new Object[]{ImmutableList.of("b"), 3},
|
||||
new Object[]{ImmutableList.of("c"), 4}
|
||||
),
|
||||
RowSignature.builder().add("s", null).add("n", null).build()
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"a", 3L},
|
||||
new Object[]{"b", 4L},
|
||||
new Object[]{"c", 6L}
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, scheduler.getTotalRun().get());
|
||||
Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
|
||||
Assert.assertEquals(2, scheduler.getTotalAcquired().get());
|
||||
Assert.assertEquals(2, scheduler.getTotalReleased().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNScanMultiValue()
|
||||
{
|
||||
ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
|
||||
.columns("s", "n")
|
||||
.intervals(
|
||||
new MultipleIntervalSegmentSpec(
|
||||
ImmutableList.of(Intervals.ETERNITY)
|
||||
)
|
||||
)
|
||||
.legacy(false)
|
||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
|
||||
.build();
|
||||
final TopNQuery query =
|
||||
new TopNQueryBuilder().dataSource(new QueryDataSource(subquery))
|
||||
.granularity(Granularities.ALL)
|
||||
.intervals(Intervals.ONLY_ETERNITY)
|
||||
.dimension(DefaultDimensionSpec.of("s"))
|
||||
.metric("sum_n")
|
||||
.threshold(100)
|
||||
.aggregators(new LongSumAggregatorFactory("sum_n", "n"))
|
||||
.build();
|
||||
|
||||
testQuery(
|
||||
query,
|
||||
// GroupBy handles its own subqueries; only the inner one will go to the cluster.
|
||||
ImmutableList.of(
|
||||
ExpectedQuery.cluster(subquery),
|
||||
ExpectedQuery.local(
|
||||
query.withDataSource(
|
||||
InlineDataSource.fromIterable(
|
||||
ImmutableList.of(
|
||||
new Object[]{ImmutableList.of("a", "b"), 1},
|
||||
new Object[]{ImmutableList.of("a", "c"), 2},
|
||||
new Object[]{ImmutableList.of("b"), 3},
|
||||
new Object[]{ImmutableList.of("c"), 4}
|
||||
),
|
||||
RowSignature.builder().add("s", null).add("n", null).build()
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{Intervals.ETERNITY.getStartMillis(), "c", 6L},
|
||||
new Object[]{Intervals.ETERNITY.getStartMillis(), "b", 4L},
|
||||
new Object[]{Intervals.ETERNITY.getStartMillis(), "a", 3L}
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, scheduler.getTotalRun().get());
|
||||
Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
|
||||
Assert.assertEquals(2, scheduler.getTotalAcquired().get());
|
||||
Assert.assertEquals(2, scheduler.getTotalReleased().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJoinOnTableErrorCantInlineTable()
|
||||
{
|
||||
@ -522,7 +650,8 @@ public class ClientQuerySegmentWalkerTest
|
||||
QueryStackTests.createClusterQuerySegmentWalker(
|
||||
ImmutableMap.of(
|
||||
FOO, makeTimeline(FOO, FOO_INLINE),
|
||||
BAR, makeTimeline(BAR, BAR_INLINE)
|
||||
BAR, makeTimeline(BAR, BAR_INLINE),
|
||||
MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE)
|
||||
),
|
||||
joinableFactory,
|
||||
conglomerate,
|
||||
|
Loading…
x
Reference in New Issue
Block a user