diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 44b22af3666..58b32e96e7f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -74,7 +74,7 @@ public interface ControllerContext /** * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}. */ - InputSpecSlicer newTableInputSpecSlicer(); + InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager); /** * Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 8eda56ad857..8457675e8f2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -366,7 +366,7 @@ public class ControllerImpl implements Controller // Execution-related: run the multi-stage QueryDefinition. final InputSpecSlicerFactory inputSpecSlicerFactory = - makeInputSpecSlicerFactory(context.newTableInputSpecSlicer()); + makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager)); final Pair> queryRunResult = new RunQueryUntilDone( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 589b17d632b..0e2cc03fda7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -45,7 +45,6 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -149,11 +148,11 @@ public class IndexerControllerContext implements ControllerContext } @Override - public InputSpecSlicer newTableInputSpecSlicer() + public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager workerManager) { final SegmentSource includeSegmentSource = MultiStageQueryContext.getSegmentSources(task.getQuerySpec().getQuery().context()); - return new TableInputSpecSlicer( + return new IndexerTableInputSpecSlicer( toolbox.getCoordinatorClient(), toolbox.getTaskActionClient(), includeSegmentSource diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java similarity index 96% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java rename to extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java index 916dd3c1db3..48283bdd78a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.input.table; +package org.apache.druid.msq.indexing; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -35,6 +35,12 @@ import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.SlicerUtils; +import org.apache.druid.msq.input.table.DataSegmentWithLocation; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.DataServerSelector; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; @@ -60,15 +66,15 @@ import java.util.stream.StreamSupport; /** * Slices {@link TableInputSpec} into {@link SegmentsInputSlice} in tasks. */ -public class TableInputSpecSlicer implements InputSpecSlicer +public class IndexerTableInputSpecSlicer implements InputSpecSlicer { - private static final Logger log = new Logger(TableInputSpecSlicer.class); + private static final Logger log = new Logger(IndexerTableInputSpecSlicer.class); private final CoordinatorClient coordinatorClient; private final TaskActionClient taskActionClient; private final SegmentSource includeSegmentSource; - public TableInputSpecSlicer( + public IndexerTableInputSpecSlicer( CoordinatorClient coordinatorClient, TaskActionClient taskActionClient, SegmentSource includeSegmentSource diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java index dd59dfebd80..6c4ec10d6df 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java @@ -32,7 +32,7 @@ import java.util.Objects; /** * Input slice representing a set of segments to read. *
- * Sliced from {@link TableInputSpec} by {@link TableInputSpecSlicer}. + * Sliced from {@link TableInputSpec}. *
* Similar to {@link org.apache.druid.query.spec.MultipleSpecificSegmentSpec} from native queries. *
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 91ee4a48788..392ac4e9150 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -32,8 +32,14 @@ public interface DataSegmentProvider * Returns a supplier that fetches the segment corresponding to the provided segmentId from deep storage. The segment * is not actually fetched until you call {@link Supplier#get()}. Once you call this, make sure to also call * {@link ResourceHolder#close()}. - *
+ * * It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. + * + * @param segmentId segment ID to fetch + * @param channelCounters counters to increment when the segment is closed + * @param isReindex true if this is a DML command (INSERT or REPLACE) writing into the same table it is + * reading from; false otherwise. When true, implementations must only allow reading from + * segments that are currently-used according to the Coordinator. */ Supplier> fetchSegment( SegmentId segmentId, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java deleted file mode 100644 index cc010a104c6..00000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.querykit; - -import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.TimelineLookup; -import org.joda.time.Interval; - -import java.util.List; -import java.util.Optional; - -public interface DataSegmentTimelineView -{ - /** - * Returns the timeline for a datasource, if it 'exists'. The analysis object passed in must represent a scan-based - * datasource of a single table. (i.e., {@link DataSourceAnalysis#getBaseTableDataSource()} must be present.) - * - * @param dataSource table data source name - * @param intervals relevant intervals. The returned timeline will *at least* include all segments that overlap - * these intervals. It may also include more. Empty means the timeline may not contain any - * segments at all. - * - * @return timeline, if it 'exists' - * - * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table - */ - Optional> getTimeline( - String dataSource, - List intervals - ); -} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java similarity index 98% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java rename to extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java index a27ae7d9780..ac864419abe 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -41,7 +42,7 @@ import org.junit.Test; import java.util.Collections; -public class TableInputSpecSlicerTest extends InitializedNullHandlingTest +public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest { private static final String DATASOURCE = "test-ds"; private static final long BYTES_PER_SEGMENT = 1000; @@ -97,7 +98,7 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest BYTES_PER_SEGMENT ); private SegmentTimeline timeline; - private TableInputSpecSlicer slicer; + private IndexerTableInputSpecSlicer slicer; private TaskActionClient taskActionClient; @Before @@ -131,7 +132,7 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest } }; - slicer = new TableInputSpecSlicer( + slicer = new IndexerTableInputSpecSlicer( null /* not used for SegmentSource.NONE */, taskActionClient, SegmentSource.NONE diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index ed518afd2ef..cd20f24d244 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -54,11 +54,11 @@ import org.apache.druid.msq.exec.WorkerManager; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.indexing.IndexerControllerContext; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -303,9 +303,9 @@ public class MSQTestControllerContext implements ControllerContext } @Override - public InputSpecSlicer newTableInputSpecSlicer() + public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) { - return new TableInputSpecSlicer( + return new IndexerTableInputSpecSlicer( coordinatorClient, taskActionClient, MultiStageQueryContext.getSegmentSources(queryContext)