diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 318c33a759c..528baa4c27d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -989,7 +989,7 @@ public class ControllerImpl implements Controller final Map, QueryKit> kitMap = ImmutableMap., QueryKit>builder() .put(ScanQuery.class, new ScanQueryKit(context.jsonMapper())) - .put(GroupByQuery.class, new GroupByQueryKit()) + .put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper())) .build(); return new MultiQueryKit(kitMap); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index a5c61c5bd75..30544cf31bf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -51,6 +51,7 @@ import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.external.ExternalDataSource; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -67,6 +68,16 @@ import java.util.stream.Collectors; */ public class DataSourcePlan { + /** + * A map with {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} set to null, so we can clear it from the context + * of subqueries. + */ + private static final Map CONTEXT_MAP_NO_SEGMENT_GRANULARITY = new HashMap<>(); + + static { + CONTEXT_MAP_NO_SEGMENT_GRANULARITY.put(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, null); + } + private final DataSource newDataSource; private final List inputSpecs; private final IntSet broadcastInputs; @@ -247,7 +258,10 @@ public class DataSourcePlan { final QueryDefinition subQueryDef = queryKit.makeQueryDefinition( queryId, - dataSource.getQuery(), + + // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the + // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. + dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), queryKit, ShuffleSpecFactories.subQueryWithMaxWorkerCount(maxWorkerCount), maxWorkerCount, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java index fcd72329165..1f863a8c735 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.SortColumn; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -76,15 +75,16 @@ public class QueryKitUtils */ public static final String CTX_TIME_COLUMN_NAME = "__timeColumn"; - private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); - - public static Granularity getSegmentGranularityFromContext(@Nullable final Map context) + public static Granularity getSegmentGranularityFromContext( + final ObjectMapper objectMapper, + @Nullable final Map context + ) { final Object o = context == null ? null : context.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY); if (o instanceof String) { try { - return OBJECT_MAPPER.readValue((String) o, Granularity.class); + return objectMapper.readValue((String) o, Granularity.class); } catch (JsonProcessingException e) { throw new ISE("Invalid segment granularity [%s]", o); @@ -188,9 +188,10 @@ public class QueryKitUtils * @throws IllegalArgumentException if the provided granularity is not supported */ @Nullable - public static VirtualColumn makeSegmentGranularityVirtualColumn(final Query query) + public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final Query query) { - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(query.getContext()); + final Granularity segmentGranularity = + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext()); final String timeColumnName = query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME); if (timeColumnName == null || Granularities.ALL.equals(segmentGranularity)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java index a44c14e3cdc..207fe53de03 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.querykit.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; @@ -92,7 +93,8 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor final FrameReader frameReader, final RowSignature resultSignature, final ClusterBy clusterBy, - final MemoryAllocator allocator + final MemoryAllocator allocator, + final ObjectMapper jsonMapper ) { this.query = query; @@ -107,7 +109,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor this.finalizeFn = makeFinalizeFn(query); this.havingSpec = cloneHavingSpec(query); this.columnSelectorFactoryForFrameWriter = - makeVirtualColumnsForFrameWriter(query).wrap( + makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap( RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory( query, () -> outputRow, @@ -311,9 +313,13 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor * Create virtual columns containing "bonus" fields that should be attached to the {@link FrameWriter} for * this processor. Kept in sync with the signature generated by {@link GroupByQueryKit}. */ - private static VirtualColumns makeVirtualColumnsForFrameWriter(final GroupByQuery query) + private static VirtualColumns makeVirtualColumnsForFrameWriter( + final ObjectMapper jsonMapper, + final GroupByQuery query + ) { - final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); if (segmentGranularityVirtualColumn == null) { return VirtualColumns.EMPTY; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java index 5987eb02fae..ffb8bacf5e6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java @@ -118,7 +118,8 @@ public class GroupByPostShuffleFrameProcessorFactory extends BaseFrameProcessorF readableInput.getChannelFrameReader(), stageDefinition.getSignature(), stageDefinition.getClusterBy(), - outputChannel.getFrameMemoryAllocator() + outputChannel.getFrameMemoryAllocator(), + frameContext.jsonMapper() ); } ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 411fe118a29..402d2dfa3d8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.querykit.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.SortColumn; @@ -56,6 +57,13 @@ import java.util.Optional; public class GroupByQueryKit implements QueryKit { + private final ObjectMapper jsonMapper; + + public GroupByQueryKit(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + @Override public QueryDefinition makeQueryDefinition( final String queryId, @@ -85,7 +93,8 @@ public class GroupByQueryKit implements QueryKit final GroupByQuery queryToRun = (GroupByQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext()); + final Granularity segmentGranularity = + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); final RowSignature intermediateSignature = computeIntermediateSignature(queryToRun); final ClusterBy resultClusterBy = QueryKitUtils.clusterByWithSegmentGranularity(computeClusterByForResults(queryToRun), segmentGranularity); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index 307d274c73b..0482e2715dc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.querykit.scan; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; @@ -92,7 +93,8 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor final ResourceHolder outputChannel, final ResourceHolder frameWriterFactoryHolder, @Nullable final AtomicLong runningCountForLimit, - final long memoryReservedForBroadcastJoin + final long memoryReservedForBroadcastJoin, + final ObjectMapper jsonMapper ) { super( @@ -111,7 +113,8 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor final List frameWriterVirtualColumns = new ArrayList<>(); frameWriterVirtualColumns.add(partitionBoostVirtualColumn); - final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); if (segmentGranularityVirtualColumn != null) { frameWriterVirtualColumns.add(segmentGranularityVirtualColumn); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java index 2a948fd4562..bda53af6964 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java @@ -100,7 +100,8 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor allocatorHolder )), runningCountForLimit, - frameContext.memoryParameters().getBroadcastJoinMemory() + frameContext.memoryParameters().getBroadcastJoinMemory(), + frameContext.jsonMapper() ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 5bfb70b52c9..9e44f152eb2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -116,7 +116,8 @@ public class ScanQueryKit implements QueryKit signatureToUse = scanSignature; } else { final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature); - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext()); + final Granularity segmentGranularity = + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); final List clusterByColumns = new ArrayList<>(); // Add regular orderBys. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index a91844114dd..2ec08e03783 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -207,6 +207,7 @@ public class MSQTaskSqlEngine implements SqlEngine try { segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext( + plannerContext.getJsonMapper(), plannerContext.queryContextMap() ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java index 2ea2958c736..d93e8df42df 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java @@ -37,6 +37,7 @@ import org.apache.druid.frame.testutil.FrameTestUtil; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; @@ -152,7 +153,8 @@ public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest }, new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})), null, - 0L + 0L, + new DefaultObjectMapper() ); ListenableFuture retVal = exec.runFully(processor, null);