mirror of https://github.com/apache/druid.git
MSQ: Only look at sqlInsertSegmentGranularity on the outer query. (#13537)
The planner sets sqlInsertSegmentGranularity in its context when using PARTITIONED BY, which sets it on every native query in the stack (as all native queries for a SQL query typically have the same context). QueryKit would interpret that as a request to configure bucketing for all native queries. This isn't useful, as bucketing is only used for the penultimate stage in INSERT / REPLACE. So, this patch modifies QueryKit to only look at sqlInsertSegmentGranularity on the outermost query. As an additional change, this patch switches the static ObjectMapper to use the processwide ObjectMapper for deserializing Granularities. Saves an ObjectMapper instance, and ensures that if there are any special serdes registered for Granularity, we'll pick them up.
This commit is contained in:
parent
013a12e86f
commit
55814888f5
|
@ -989,7 +989,7 @@ public class ControllerImpl implements Controller
|
|||
final Map<Class<? extends Query>, QueryKit> kitMap =
|
||||
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
|
||||
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
|
||||
.put(GroupByQuery.class, new GroupByQueryKit())
|
||||
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
|
||||
.build();
|
||||
|
||||
return new MultiQueryKit(kitMap);
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.druid.query.spec.QuerySegmentSpec;
|
|||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.sql.calcite.external.ExternalDataSource;
|
||||
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -67,6 +68,16 @@ import java.util.stream.Collectors;
|
|||
*/
|
||||
public class DataSourcePlan
|
||||
{
|
||||
/**
|
||||
* A map with {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} set to null, so we can clear it from the context
|
||||
* of subqueries.
|
||||
*/
|
||||
private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY = new HashMap<>();
|
||||
|
||||
static {
|
||||
CONTEXT_MAP_NO_SEGMENT_GRANULARITY.put(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, null);
|
||||
}
|
||||
|
||||
private final DataSource newDataSource;
|
||||
private final List<InputSpec> inputSpecs;
|
||||
private final IntSet broadcastInputs;
|
||||
|
@ -247,7 +258,10 @@ public class DataSourcePlan
|
|||
{
|
||||
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
|
||||
queryId,
|
||||
dataSource.getQuery(),
|
||||
|
||||
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
|
||||
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
|
||||
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
|
||||
queryKit,
|
||||
ShuffleSpecFactories.subQueryWithMaxWorkerCount(maxWorkerCount),
|
||||
maxWorkerCount,
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
|
||||
import org.apache.druid.frame.key.ClusterBy;
|
||||
import org.apache.druid.frame.key.SortColumn;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
@ -76,15 +75,16 @@ public class QueryKitUtils
|
|||
*/
|
||||
public static final String CTX_TIME_COLUMN_NAME = "__timeColumn";
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
|
||||
|
||||
public static Granularity getSegmentGranularityFromContext(@Nullable final Map<String, Object> context)
|
||||
public static Granularity getSegmentGranularityFromContext(
|
||||
final ObjectMapper objectMapper,
|
||||
@Nullable final Map<String, Object> context
|
||||
)
|
||||
{
|
||||
final Object o = context == null ? null : context.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY);
|
||||
|
||||
if (o instanceof String) {
|
||||
try {
|
||||
return OBJECT_MAPPER.readValue((String) o, Granularity.class);
|
||||
return objectMapper.readValue((String) o, Granularity.class);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new ISE("Invalid segment granularity [%s]", o);
|
||||
|
@ -188,9 +188,10 @@ public class QueryKitUtils
|
|||
* @throws IllegalArgumentException if the provided granularity is not supported
|
||||
*/
|
||||
@Nullable
|
||||
public static VirtualColumn makeSegmentGranularityVirtualColumn(final Query<?> query)
|
||||
public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final Query<?> query)
|
||||
{
|
||||
final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(query.getContext());
|
||||
final Granularity segmentGranularity =
|
||||
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext());
|
||||
final String timeColumnName = query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME);
|
||||
|
||||
if (timeColumnName == null || Granularities.ALL.equals(segmentGranularity)) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.msq.querykit.groupby;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import it.unimi.dsi.fastutil.ints.IntSet;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.FrameType;
|
||||
|
@ -92,7 +93,8 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Long>
|
|||
final FrameReader frameReader,
|
||||
final RowSignature resultSignature,
|
||||
final ClusterBy clusterBy,
|
||||
final MemoryAllocator allocator
|
||||
final MemoryAllocator allocator,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.query = query;
|
||||
|
@ -107,7 +109,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Long>
|
|||
this.finalizeFn = makeFinalizeFn(query);
|
||||
this.havingSpec = cloneHavingSpec(query);
|
||||
this.columnSelectorFactoryForFrameWriter =
|
||||
makeVirtualColumnsForFrameWriter(query).wrap(
|
||||
makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap(
|
||||
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
|
||||
query,
|
||||
() -> outputRow,
|
||||
|
@ -311,9 +313,13 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Long>
|
|||
* Create virtual columns containing "bonus" fields that should be attached to the {@link FrameWriter} for
|
||||
* this processor. Kept in sync with the signature generated by {@link GroupByQueryKit}.
|
||||
*/
|
||||
private static VirtualColumns makeVirtualColumnsForFrameWriter(final GroupByQuery query)
|
||||
private static VirtualColumns makeVirtualColumnsForFrameWriter(
|
||||
final ObjectMapper jsonMapper,
|
||||
final GroupByQuery query
|
||||
)
|
||||
{
|
||||
final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query);
|
||||
final VirtualColumn segmentGranularityVirtualColumn =
|
||||
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
|
||||
|
||||
if (segmentGranularityVirtualColumn == null) {
|
||||
return VirtualColumns.EMPTY;
|
||||
|
|
|
@ -118,7 +118,8 @@ public class GroupByPostShuffleFrameProcessorFactory extends BaseFrameProcessorF
|
|||
readableInput.getChannelFrameReader(),
|
||||
stageDefinition.getSignature(),
|
||||
stageDefinition.getClusterBy(),
|
||||
outputChannel.getFrameMemoryAllocator()
|
||||
outputChannel.getFrameMemoryAllocator(),
|
||||
frameContext.jsonMapper()
|
||||
);
|
||||
}
|
||||
);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.msq.querykit.groupby;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.frame.key.ClusterBy;
|
||||
import org.apache.druid.frame.key.SortColumn;
|
||||
|
@ -56,6 +57,13 @@ import java.util.Optional;
|
|||
|
||||
public class GroupByQueryKit implements QueryKit<GroupByQuery>
|
||||
{
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public GroupByQueryKit(ObjectMapper jsonMapper)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryDefinition makeQueryDefinition(
|
||||
final String queryId,
|
||||
|
@ -85,7 +93,8 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
|
|||
final GroupByQuery queryToRun = (GroupByQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
|
||||
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
|
||||
|
||||
final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext());
|
||||
final Granularity segmentGranularity =
|
||||
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
|
||||
final RowSignature intermediateSignature = computeIntermediateSignature(queryToRun);
|
||||
final ClusterBy resultClusterBy =
|
||||
QueryKitUtils.clusterByWithSegmentGranularity(computeClusterByForResults(queryToRun), segmentGranularity);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.msq.querykit.scan;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterables;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||
|
@ -92,7 +93,8 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
|
|||
final ResourceHolder<WritableFrameChannel> outputChannel,
|
||||
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
|
||||
@Nullable final AtomicLong runningCountForLimit,
|
||||
final long memoryReservedForBroadcastJoin
|
||||
final long memoryReservedForBroadcastJoin,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -111,7 +113,8 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
|
|||
final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
|
||||
frameWriterVirtualColumns.add(partitionBoostVirtualColumn);
|
||||
|
||||
final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query);
|
||||
final VirtualColumn segmentGranularityVirtualColumn =
|
||||
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
|
||||
|
||||
if (segmentGranularityVirtualColumn != null) {
|
||||
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
|
||||
|
|
|
@ -100,7 +100,8 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor
|
|||
allocatorHolder
|
||||
)),
|
||||
runningCountForLimit,
|
||||
frameContext.memoryParameters().getBroadcastJoinMemory()
|
||||
frameContext.memoryParameters().getBroadcastJoinMemory(),
|
||||
frameContext.jsonMapper()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,7 +116,8 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
|
|||
signatureToUse = scanSignature;
|
||||
} else {
|
||||
final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature);
|
||||
final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext());
|
||||
final Granularity segmentGranularity =
|
||||
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
|
||||
final List<SortColumn> clusterByColumns = new ArrayList<>();
|
||||
|
||||
// Add regular orderBys.
|
||||
|
|
|
@ -207,6 +207,7 @@ public class MSQTaskSqlEngine implements SqlEngine
|
|||
|
||||
try {
|
||||
segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(
|
||||
plannerContext.getJsonMapper(),
|
||||
plannerContext.queryContextMap()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.druid.frame.testutil.FrameTestUtil;
|
|||
import org.apache.druid.frame.write.FrameWriter;
|
||||
import org.apache.druid.frame.write.FrameWriterFactory;
|
||||
import org.apache.druid.frame.write.FrameWriters;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -152,7 +153,8 @@ public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest
|
|||
},
|
||||
new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})),
|
||||
null,
|
||||
0L
|
||||
0L,
|
||||
new DefaultObjectMapper()
|
||||
);
|
||||
|
||||
ListenableFuture<Long> retVal = exec.runFully(processor, null);
|
||||
|
|
Loading…
Reference in New Issue