TableInputSpecSlicer changes to support running on Brokers. (#17074)

* TableInputSpecSlicer changes to support running on Brokers.

Changes:

1) Rename TableInputSpecSlicer to IndexerTableInputSpecSlicer, in anticipation
   of a new implementation being added for controllers running on Brokers.

2) Allow the context to use the WorkerManager to build the TableInputSpecSlicer,
   in anticipation of Brokers wanting to use this to assign segments to servers
   that are already serving those segments.

3) Remove unused DataSegmentTimelineView interface.

4) Add additional javadoc to DataSegmentProvider.

* Style.
This commit is contained in:
Gian Merlino 2024-09-17 03:51:18 -07:00 committed by GitHub
parent c56e23ec37
commit 8af9b4729f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 29 additions and 66 deletions

View File

@ -74,7 +74,7 @@ public interface ControllerContext
/** /**
* Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}. * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}.
*/ */
InputSpecSlicer newTableInputSpecSlicer(); InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager);
/** /**
* Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where * Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where

View File

@ -366,7 +366,7 @@ public class ControllerImpl implements Controller
// Execution-related: run the multi-stage QueryDefinition. // Execution-related: run the multi-stage QueryDefinition.
final InputSpecSlicerFactory inputSpecSlicerFactory = final InputSpecSlicerFactory inputSpecSlicerFactory =
makeInputSpecSlicerFactory(context.newTableInputSpecSlicer()); makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager));
final Pair<ControllerQueryKernel, ListenableFuture<?>> queryRunResult = final Pair<ControllerQueryKernel, ListenableFuture<?>> queryRunResult =
new RunQueryUntilDone( new RunQueryUntilDone(

View File

@ -45,7 +45,6 @@ import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
@ -149,11 +148,11 @@ public class IndexerControllerContext implements ControllerContext
} }
@Override @Override
public InputSpecSlicer newTableInputSpecSlicer() public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager workerManager)
{ {
final SegmentSource includeSegmentSource = final SegmentSource includeSegmentSource =
MultiStageQueryContext.getSegmentSources(task.getQuerySpec().getQuery().context()); MultiStageQueryContext.getSegmentSources(task.getQuerySpec().getQuery().context());
return new TableInputSpecSlicer( return new IndexerTableInputSpecSlicer(
toolbox.getCoordinatorClient(), toolbox.getCoordinatorClient(),
toolbox.getTaskActionClient(), toolbox.getTaskActionClient(),
includeSegmentSource includeSegmentSource

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.msq.input.table; package org.apache.druid.msq.indexing;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -35,6 +35,12 @@ import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils; import org.apache.druid.msq.input.SlicerUtils;
import org.apache.druid.msq.input.table.DataSegmentWithLocation;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.msq.input.table.DataServerSelector;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -60,15 +66,15 @@ import java.util.stream.StreamSupport;
/** /**
* Slices {@link TableInputSpec} into {@link SegmentsInputSlice} in tasks. * Slices {@link TableInputSpec} into {@link SegmentsInputSlice} in tasks.
*/ */
public class TableInputSpecSlicer implements InputSpecSlicer public class IndexerTableInputSpecSlicer implements InputSpecSlicer
{ {
private static final Logger log = new Logger(TableInputSpecSlicer.class); private static final Logger log = new Logger(IndexerTableInputSpecSlicer.class);
private final CoordinatorClient coordinatorClient; private final CoordinatorClient coordinatorClient;
private final TaskActionClient taskActionClient; private final TaskActionClient taskActionClient;
private final SegmentSource includeSegmentSource; private final SegmentSource includeSegmentSource;
public TableInputSpecSlicer( public IndexerTableInputSpecSlicer(
CoordinatorClient coordinatorClient, CoordinatorClient coordinatorClient,
TaskActionClient taskActionClient, TaskActionClient taskActionClient,
SegmentSource includeSegmentSource SegmentSource includeSegmentSource

View File

@ -32,7 +32,7 @@ import java.util.Objects;
/** /**
* Input slice representing a set of segments to read. * Input slice representing a set of segments to read.
* <br> * <br>
* Sliced from {@link TableInputSpec} by {@link TableInputSpecSlicer}. * Sliced from {@link TableInputSpec}.
* <br> * <br>
* Similar to {@link org.apache.druid.query.spec.MultipleSpecificSegmentSpec} from native queries. * Similar to {@link org.apache.druid.query.spec.MultipleSpecificSegmentSpec} from native queries.
* <br> * <br>

View File

@ -32,8 +32,14 @@ public interface DataSegmentProvider
* Returns a supplier that fetches the segment corresponding to the provided segmentId from deep storage. The segment * Returns a supplier that fetches the segment corresponding to the provided segmentId from deep storage. The segment
* is not actually fetched until you call {@link Supplier#get()}. Once you call this, make sure to also call * is not actually fetched until you call {@link Supplier#get()}. Once you call this, make sure to also call
* {@link ResourceHolder#close()}. * {@link ResourceHolder#close()}.
* <br> *
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. * It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}.
*
* @param segmentId segment ID to fetch
* @param channelCounters counters to increment when the segment is closed
* @param isReindex true if this is a DML command (INSERT or REPLACE) writing into the same table it is
* reading from; false otherwise. When true, implementations must only allow reading from
* segments that are currently-used according to the Coordinator.
*/ */
Supplier<ResourceHolder<Segment>> fetchSegment( Supplier<ResourceHolder<Segment>> fetchSegment(
SegmentId segmentId, SegmentId segmentId,

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.joda.time.Interval;
import java.util.List;
import java.util.Optional;
public interface DataSegmentTimelineView
{
/**
* Returns the timeline for a datasource, if it 'exists'. The analysis object passed in must represent a scan-based
* datasource of a single table. (i.e., {@link DataSourceAnalysis#getBaseTableDataSource()} must be present.)
*
* @param dataSource table data source name
* @param intervals relevant intervals. The returned timeline will *at least* include all segments that overlap
* these intervals. It may also include more. Empty means the timeline may not contain any
* segments at all.
*
* @return timeline, if it 'exists'
*
* @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table
*/
Optional<TimelineLookup<String, DataSegment>> getTimeline(
String dataSource,
List<Interval> intervals
);
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.testing.InitializedNullHandlingTest;
@ -41,7 +42,7 @@ import org.junit.Test;
import java.util.Collections; import java.util.Collections;
public class TableInputSpecSlicerTest extends InitializedNullHandlingTest public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest
{ {
private static final String DATASOURCE = "test-ds"; private static final String DATASOURCE = "test-ds";
private static final long BYTES_PER_SEGMENT = 1000; private static final long BYTES_PER_SEGMENT = 1000;
@ -97,7 +98,7 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
BYTES_PER_SEGMENT BYTES_PER_SEGMENT
); );
private SegmentTimeline timeline; private SegmentTimeline timeline;
private TableInputSpecSlicer slicer; private IndexerTableInputSpecSlicer slicer;
private TaskActionClient taskActionClient; private TaskActionClient taskActionClient;
@Before @Before
@ -131,7 +132,7 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
} }
}; };
slicer = new TableInputSpecSlicer( slicer = new IndexerTableInputSpecSlicer(
null /* not used for SegmentSource.NONE */, null /* not used for SegmentSource.NONE */,
taskActionClient, taskActionClient,
SegmentSource.NONE SegmentSource.NONE

View File

@ -54,11 +54,11 @@ import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.indexing.IndexerControllerContext; import org.apache.druid.msq.indexing.IndexerControllerContext;
import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContext;
@ -303,9 +303,9 @@ public class MSQTestControllerContext implements ControllerContext
} }
@Override @Override
public InputSpecSlicer newTableInputSpecSlicer() public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
{ {
return new TableInputSpecSlicer( return new IndexerTableInputSpecSlicer(
coordinatorClient, coordinatorClient,
taskActionClient, taskActionClient,
MultiStageQueryContext.getSegmentSources(queryContext) MultiStageQueryContext.getSegmentSources(queryContext)