Add task toolbox to DruidInputSource (#14507)

Add task toolbox to DruidInputSource
This commit is contained in:
AmatyaAvadhanula 2023-07-31 13:12:01 +05:30 committed by GitHub
parent 9e1650e327
commit c648b1cb36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 160 additions and 26 deletions

View File

@ -733,7 +733,7 @@ public class CompactionTask extends AbstractBatchIndexTask
coordinatorClient,
segmentCacheManagerFactory,
toolbox.getConfig()
),
).withTaskToolbox(toolbox),
null,
false,
compactionIOConfig.isDropExisting()

View File

@ -63,6 +63,7 @@ import org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAn
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.common.task.batch.partition.LinearPartitionAnalysis;
import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis;
import org.apache.druid.indexing.input.TaskInputSource;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -133,6 +134,7 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -140,8 +142,6 @@ import java.util.stream.Collectors;
public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
{
public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
public static final String TYPE = "index";
@ -504,7 +504,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
.inputIntervals()
.isEmpty();
final InputSource inputSource = ingestionSchema.getIOConfig().getInputSource();
final InputSource inputSource = ingestionSchema.getIOConfig()
.getNonNullInputSource(toolbox);
final File tmpDir = toolbox.getIndexingTmpDir();
@ -1149,6 +1150,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
private final FirehoseFactory firehoseFactory;
private final InputSource inputSource;
private final AtomicReference<InputSource> inputSourceWithToolbox = new AtomicReference<>();
private final InputFormat inputFormat;
private boolean appendToExisting;
private boolean dropExisting;
@ -1199,6 +1201,24 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
return inputSource;
}
public InputSource getNonNullInputSource()
{
return Preconditions.checkNotNull(inputSource, "inputSource");
}
public InputSource getNonNullInputSource(TaskToolbox toolbox)
{
Preconditions.checkNotNull(inputSource, "inputSource");
if (inputSourceWithToolbox.get() == null) {
if (inputSource instanceof TaskInputSource) {
inputSourceWithToolbox.set(((TaskInputSource) inputSource).withTaskToolbox(toolbox));
} else {
inputSourceWithToolbox.set(inputSource);
}
}
return inputSourceWithToolbox.get();
}
/**
* Returns {@link InputFormat}. Can be null if {@link DataSchema#parserMap} is specified.
* Also can be null in {@link InputSourceSampler}.
@ -1211,11 +1231,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
return inputFormat;
}
public InputSource getNonNullInputSource()
{
return Preconditions.checkNotNull(inputSource, "inputSource");
}
public InputFormat getNonNullInputFormat()
{
return Preconditions.checkNotNull(inputFormat, "inputFormat");

View File

@ -58,7 +58,7 @@ abstract class InputSourceSplitParallelIndexTaskRunner<T extends Task, R extends
context
);
this.ingestionSchema = ingestionSchema;
this.baseInputSource = (SplittableInputSource) ingestionSchema.getIOConfig().getNonNullInputSource();
this.baseInputSource = (SplittableInputSource) ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
}
@Override

View File

@ -156,6 +156,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
* Only the compaction task can have a special base name.
*/
private final String baseSubtaskSpecName;
private final InputSource baseInputSource;
/**
@ -514,6 +515,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
);
try {
initializeSubTaskCleaner();
if (isParallelMode()) {
@ -521,6 +523,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
emitMetric(toolbox.getEmitter(), "ingest/count", 1);
this.toolbox = toolbox;
if (isGuaranteedRollup(
getIngestionMode(),
ingestionSchema.getTuningConfig()

View File

@ -181,7 +181,7 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource();
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
InputFormat inputFormat = inputSource.needsFormat()
? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
: null;

View File

@ -228,7 +228,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
);
boolean isAssumeGrouped = partitionsSpec.isAssumeGrouped();
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource();
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
InputFormat inputFormat = inputSource.needsFormat()
? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
: null;

View File

@ -109,7 +109,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
@Override
public final TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource();
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build(
supervisorTaskId,

View File

@ -117,7 +117,7 @@ public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner
context
);
this.ingestionSchema = ingestionSchema;
this.baseInputSource = (SplittableInputSource) ingestionSchema.getIOConfig().getNonNullInputSource();
this.baseInputSource = (SplittableInputSource) ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
}
@VisibleForTesting

View File

@ -270,7 +270,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
);
final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource();
final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build(
supervisorTaskId,

View File

@ -37,6 +37,7 @@ import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
@ -46,8 +47,11 @@ import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
@ -67,6 +71,7 @@ import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -82,12 +87,12 @@ import java.util.TreeMap;
import java.util.stream.Stream;
/**
* An {@link org.apache.druid.data.input.InputSource} that allows reading from Druid segments.
* An {@link InputSource} that allows reading from Druid segments.
*
* Used internally by {@link org.apache.druid.indexing.common.task.CompactionTask}, and can also be used directly.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class DruidInputSource extends AbstractInputSource implements SplittableInputSource<List<WindowedSegmentId>>
public class DruidInputSource extends AbstractInputSource implements SplittableInputSource<List<WindowedSegmentId>>, TaskInputSource
{
public static final String TYPE_KEY = "druid";
@ -147,6 +152,9 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
*/
private final List<String> metrics;
@Nullable
private final TaskToolbox toolbox;
@JsonCreator
public DruidInputSource(
@JsonProperty("dataSource") final String dataSource,
@ -162,6 +170,35 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
@JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
@JacksonInject TaskConfig taskConfig
)
{
this(
dataSource,
interval,
segmentIds,
dimFilter,
dimensions,
metrics,
indexIO,
coordinatorClient,
segmentCacheManagerFactory,
taskConfig,
null
);
}
private DruidInputSource(
final String dataSource,
@Nullable Interval interval,
@Nullable List<WindowedSegmentId> segmentIds,
DimFilter dimFilter,
List<String> dimensions,
List<String> metrics,
IndexIO indexIO,
CoordinatorClient coordinatorClient,
SegmentCacheManagerFactory segmentCacheManagerFactory,
TaskConfig taskConfig,
@Nullable TaskToolbox toolbox
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) {
@ -177,6 +214,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory");
this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig");
this.toolbox = toolbox;
}
@JsonIgnore
@ -231,6 +269,24 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
return metrics;
}
@Override
public InputSource withTaskToolbox(TaskToolbox toolbox)
{
return new DruidInputSource(
this.dataSource,
this.interval,
this.segmentIds,
this.dimFilter,
this.dimensions,
this.metrics,
this.indexIO,
this.coordinatorClient,
this.segmentCacheManagerFactory,
this.taskConfig,
toolbox
);
}
@Override
protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory)
{
@ -307,7 +363,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
if (interval == null) {
return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds);
} else {
return getTimelineForInterval(coordinatorClient, dataSource, interval);
return getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval);
}
}
@ -322,6 +378,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
if (segmentIds == null) {
return Streams.sequentialStreamFrom(
createSplits(
toolbox,
coordinatorClient,
dataSource,
interval,
@ -341,6 +398,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
if (segmentIds == null) {
return Iterators.size(
createSplits(
toolbox,
coordinatorClient,
dataSource,
interval,
@ -365,7 +423,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
indexIO,
coordinatorClient,
segmentCacheManagerFactory,
taskConfig
taskConfig,
toolbox
);
}
@ -413,6 +472,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
}
public static Iterator<InputSplit<List<WindowedSegmentId>>> createSplits(
TaskToolbox toolbox,
CoordinatorClient coordinatorClient,
String dataSource,
Interval interval,
@ -431,6 +491,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
}
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = getTimelineForInterval(
toolbox,
coordinatorClient,
dataSource,
interval
@ -474,18 +535,37 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
}
public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForInterval(
TaskToolbox toolbox,
CoordinatorClient coordinatorClient,
String dataSource,
Interval interval
)
{
final Collection<DataSegment> usedSegments = FutureUtils.getUnchecked(
coordinatorClient.fetchUsedSegments(
dataSource,
Collections.singletonList(Preconditions.checkNotNull(interval, "interval"))
),
true
);
Preconditions.checkNotNull(interval);
final Collection<DataSegment> usedSegments;
if (toolbox == null) {
usedSegments = FutureUtils.getUnchecked(
coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)),
true
);
} else {
try {
usedSegments = toolbox.getTaskActionClient()
.submit(
new RetrieveUsedSegmentsAction(
dataSource,
null,
Collections.singletonList(interval),
Segments.ONLY_VISIBLE
)
);
}
catch (IOException e) {
LOG.error(e, "Error retrieving the used segments for interval[%s].", interval);
throw new RuntimeException(e);
}
}
return SegmentTimeline.forSegments(usedSegments).lookup(interval);
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.input;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexing.common.TaskToolbox;
/**
* An InputSource that allows setting a {@link TaskToolbox}
* to be used for various purposes such as submitting task actions to the Overlord.
*/
public interface TaskInputSource extends InputSource
{
/**
* Creates and returns a new {@code InputSource} which uses the given {@code TaskToolbox}.
*/
InputSource withTaskToolbox(TaskToolbox toolbox);
}

View File

@ -756,6 +756,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
List<InputSplit<List<WindowedSegmentId>>> splits = Lists.newArrayList(
DruidInputSource.createSplits(
null,
getCoordinatorClient(),
DATA_SOURCE,
INTERVAL_TO_INDEX,