From 8af9b4729f4fc8007f5ab4d18e3dbfdc3dea5c1c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 17 Sep 2024 03:51:18 -0700 Subject: [PATCH] TableInputSpecSlicer changes to support running on Brokers. (#17074) * TableInputSpecSlicer changes to support running on Brokers. Changes: 1) Rename TableInputSpecSlicer to IndexerTableInputSpecSlicer, in anticipation of a new implementation being added for controllers running on Brokers. 2) Allow the context to use the WorkerManager to build the TableInputSpecSlicer, in anticipation of Brokers wanting to use this to assign segments to servers that are already serving those segments. 3) Remove unused DataSegmentTimelineView interface. 4) Add additional javadoc to DataSegmentProvider. * Style. --- .../druid/msq/exec/ControllerContext.java | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 2 +- .../indexing/IndexerControllerContext.java | 5 +- .../IndexerTableInputSpecSlicer.java} | 14 ++++-- .../msq/input/table/SegmentsInputSlice.java | 2 +- .../msq/querykit/DataSegmentProvider.java | 8 ++- .../msq/querykit/DataSegmentTimelineView.java | 49 ------------------- ...a => IndexerTableInputSpecSlicerTest.java} | 7 +-- .../msq/test/MSQTestControllerContext.java | 6 +-- 9 files changed, 29 insertions(+), 66 deletions(-) rename extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/{input/table/TableInputSpecSlicer.java => indexing/IndexerTableInputSpecSlicer.java} (96%) delete mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java rename extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/{TableInputSpecSlicerTest.java => IndexerTableInputSpecSlicerTest.java} (98%) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 44b22af3666..58b32e96e7f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -74,7 +74,7 @@ public interface ControllerContext /** * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}. */ - InputSpecSlicer newTableInputSpecSlicer(); + InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager); /** * Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 8eda56ad857..8457675e8f2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -366,7 +366,7 @@ public class ControllerImpl implements Controller // Execution-related: run the multi-stage QueryDefinition. final InputSpecSlicerFactory inputSpecSlicerFactory = - makeInputSpecSlicerFactory(context.newTableInputSpecSlicer()); + makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager)); final Pair> queryRunResult = new RunQueryUntilDone( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 589b17d632b..0e2cc03fda7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -45,7 +45,6 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -149,11 +148,11 @@ public class IndexerControllerContext implements ControllerContext } @Override - public InputSpecSlicer newTableInputSpecSlicer() + public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager workerManager) { final SegmentSource includeSegmentSource = MultiStageQueryContext.getSegmentSources(task.getQuerySpec().getQuery().context()); - return new TableInputSpecSlicer( + return new IndexerTableInputSpecSlicer( toolbox.getCoordinatorClient(), toolbox.getTaskActionClient(), includeSegmentSource diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java similarity index 96% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java rename to extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java index 916dd3c1db3..48283bdd78a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.input.table; +package org.apache.druid.msq.indexing; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -35,6 +35,12 @@ import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.SlicerUtils; +import org.apache.druid.msq.input.table.DataSegmentWithLocation; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.DataServerSelector; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; @@ -60,15 +66,15 @@ import java.util.stream.StreamSupport; /** * Slices {@link TableInputSpec} into {@link SegmentsInputSlice} in tasks. */ -public class TableInputSpecSlicer implements InputSpecSlicer +public class IndexerTableInputSpecSlicer implements InputSpecSlicer { - private static final Logger log = new Logger(TableInputSpecSlicer.class); + private static final Logger log = new Logger(IndexerTableInputSpecSlicer.class); private final CoordinatorClient coordinatorClient; private final TaskActionClient taskActionClient; private final SegmentSource includeSegmentSource; - public TableInputSpecSlicer( + public IndexerTableInputSpecSlicer( CoordinatorClient coordinatorClient, TaskActionClient taskActionClient, SegmentSource includeSegmentSource diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java index dd59dfebd80..6c4ec10d6df 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java @@ -32,7 +32,7 @@ import java.util.Objects; /** * Input slice representing a set of segments to read. *
- * Sliced from {@link TableInputSpec} by {@link TableInputSpecSlicer}. + * Sliced from {@link TableInputSpec}. *
* Similar to {@link org.apache.druid.query.spec.MultipleSpecificSegmentSpec} from native queries. *
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 91ee4a48788..392ac4e9150 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -32,8 +32,14 @@ public interface DataSegmentProvider * Returns a supplier that fetches the segment corresponding to the provided segmentId from deep storage. The segment * is not actually fetched until you call {@link Supplier#get()}. Once you call this, make sure to also call * {@link ResourceHolder#close()}. - *
+ * * It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. + * + * @param segmentId segment ID to fetch + * @param channelCounters counters to increment when the segment is closed + * @param isReindex true if this is a DML command (INSERT or REPLACE) writing into the same table it is + * reading from; false otherwise. When true, implementations must only allow reading from + * segments that are currently-used according to the Coordinator. */ Supplier> fetchSegment( SegmentId segmentId, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java deleted file mode 100644 index cc010a104c6..00000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.querykit; - -import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.TimelineLookup; -import org.joda.time.Interval; - -import java.util.List; -import java.util.Optional; - -public interface DataSegmentTimelineView -{ - /** - * Returns the timeline for a datasource, if it 'exists'. The analysis object passed in must represent a scan-based - * datasource of a single table. (i.e., {@link DataSourceAnalysis#getBaseTableDataSource()} must be present.) - * - * @param dataSource table data source name - * @param intervals relevant intervals. The returned timeline will *at least* include all segments that overlap - * these intervals. It may also include more. Empty means the timeline may not contain any - * segments at all. - * - * @return timeline, if it 'exists' - * - * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table - */ - Optional> getTimeline( - String dataSource, - List intervals - ); -} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java similarity index 98% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java rename to extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java index a27ae7d9780..ac864419abe 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -41,7 +42,7 @@ import org.junit.Test; import java.util.Collections; -public class TableInputSpecSlicerTest extends InitializedNullHandlingTest +public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest { private static final String DATASOURCE = "test-ds"; private static final long BYTES_PER_SEGMENT = 1000; @@ -97,7 +98,7 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest BYTES_PER_SEGMENT ); private SegmentTimeline timeline; - private TableInputSpecSlicer slicer; + private IndexerTableInputSpecSlicer slicer; private TaskActionClient taskActionClient; @Before @@ -131,7 +132,7 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest } }; - slicer = new TableInputSpecSlicer( + slicer = new IndexerTableInputSpecSlicer( null /* not used for SegmentSource.NONE */, taskActionClient, SegmentSource.NONE diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index ed518afd2ef..cd20f24d244 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -54,11 +54,11 @@ import org.apache.druid.msq.exec.WorkerManager; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.indexing.IndexerControllerContext; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -303,9 +303,9 @@ public class MSQTestControllerContext implements ControllerContext } @Override - public InputSpecSlicer newTableInputSpecSlicer() + public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) { - return new TableInputSpecSlicer( + return new IndexerTableInputSpecSlicer( coordinatorClient, taskActionClient, MultiStageQueryContext.getSegmentSources(queryContext)