WindowOperatorQueryFrameProcessorFactory: Pass QueryContext instead of WindowOperatorQuery to WindowOperatorQueryFrameProcessor (#17405)

* WindowOperatorQueryKit: Pass QueryContext instead of WindowOperatorQuery to subsequent layers

* Add serializer for QueryContext class

* Revert changes of WindowOperatorQueryFrameProcessorFactory json param

* Fix checkstyle

* Address review comment: Remove older method in favor of calling new method inline
This commit is contained in:
Akshat Jain 2024-11-07 11:29:49 +05:30 committed by GitHub
parent 9c25226e06
commit 73cbce9109
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 14 additions and 14 deletions

View File

@ -34,7 +34,7 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
@ -191,11 +191,11 @@ public class QueryKitUtils
* @throws IllegalArgumentException if the provided granularity is not supported
*/
@Nullable
public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final Query<?> query)
public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final QueryContext queryContext)
{
final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext());
final String timeColumnName = query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME);
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryContext.asMap());
final String timeColumnName = queryContext.getString(QueryKitUtils.CTX_TIME_COLUMN_NAME);
if (timeColumnName == null || Granularities.ALL.equals(segmentGranularity)) {
return null;

View File

@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
@ -46,7 +47,6 @@ import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
@ -87,7 +87,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
final AtomicInteger rowId = new AtomicInteger(0);
public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
QueryContext queryContext,
ReadableFrameChannel inputChannel,
WritableFrameChannel outputChannel,
FrameWriterFactory frameWriterFactory,
@ -100,7 +100,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
this.outputChannel = outputChannel;
this.frameWriterFactory = frameWriterFactory;
this.resultRowAndCols = new ArrayList<>();
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext);
this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList);
this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized);
@ -110,7 +110,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, queryContext);
if (segmentGranularityVirtualColumn != null) {
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
}

View File

@ -156,7 +156,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());
return new WindowOperatorQueryFrameProcessor(
query,
query.context(),
readableInput.getChannel(),
outputChannel.getWritableChannel(),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes),

View File

@ -321,7 +321,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
virtualColumns.add(partitionBoostVirtualColumn);
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query.context());
if (segmentGranularityVirtualColumn != null) {
virtualColumns.add(segmentGranularityVirtualColumn);
}

View File

@ -137,7 +137,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
frameWriterVirtualColumns.add(partitionBoostVirtualColumn);
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query.context());
if (segmentGranularityVirtualColumn != null) {
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);

View File

@ -125,7 +125,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
query.context(),
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
@ -209,7 +209,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
query.context(),
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
@ -316,7 +316,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
);
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
query.context(),
factChannel.getChannel(),
countingWritableFrameChannel,
frameWriterFactory,