From c648b1cb36c7671a281155e3a018c74592012b8d Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Mon, 31 Jul 2023 13:12:01 +0530 Subject: [PATCH] Add task toolbox to DruidInputSource (#14507) Add task toolbox to DruidInputSource --- .../indexing/common/task/CompactionTask.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 31 ++++-- ...putSourceSplitParallelIndexTaskRunner.java | 2 +- .../parallel/ParallelIndexSupervisorTask.java | 3 + .../PartialDimensionCardinalityTask.java | 2 +- .../PartialDimensionDistributionTask.java | 2 +- .../parallel/PartialSegmentGenerateTask.java | 2 +- .../SinglePhaseParallelIndexTaskRunner.java | 2 +- .../batch/parallel/SinglePhaseSubTask.java | 2 +- .../indexing/input/DruidInputSource.java | 102 ++++++++++++++++-- .../druid/indexing/input/TaskInputSource.java | 35 ++++++ .../task/CompactionTaskParallelRunTest.java | 1 + 12 files changed, 160 insertions(+), 26 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/input/TaskInputSource.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 82bc43eb020..108d186b75d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -733,7 +733,7 @@ public class CompactionTask extends AbstractBatchIndexTask coordinatorClient, segmentCacheManagerFactory, toolbox.getConfig() - ), + ).withTaskToolbox(toolbox), null, false, compactionIOConfig.isDropExisting() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 9fbf8a6b50b..3a4b574d4e6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -63,6 +63,7 @@ import org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAn import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.LinearPartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis; +import org.apache.druid.indexing.input.TaskInputSource; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -133,6 +134,7 @@ import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -140,8 +142,6 @@ import java.util.stream.Collectors; public class IndexTask extends AbstractBatchIndexTask implements ChatHandler { - - public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128(); public static final String TYPE = "index"; @@ -504,7 +504,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler .inputIntervals() .isEmpty(); - final InputSource inputSource = ingestionSchema.getIOConfig().getInputSource(); + final InputSource inputSource = ingestionSchema.getIOConfig() + .getNonNullInputSource(toolbox); final File tmpDir = toolbox.getIndexingTmpDir(); @@ -1149,6 +1150,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler private final FirehoseFactory firehoseFactory; private final InputSource inputSource; + private final AtomicReference inputSourceWithToolbox = new AtomicReference<>(); private final InputFormat inputFormat; private boolean appendToExisting; private boolean dropExisting; @@ -1199,6 +1201,24 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler return inputSource; } + public InputSource getNonNullInputSource() + { + return Preconditions.checkNotNull(inputSource, "inputSource"); + } + + public InputSource getNonNullInputSource(TaskToolbox toolbox) + { + Preconditions.checkNotNull(inputSource, "inputSource"); + if (inputSourceWithToolbox.get() == null) { + if (inputSource instanceof TaskInputSource) { + inputSourceWithToolbox.set(((TaskInputSource) inputSource).withTaskToolbox(toolbox)); + } else { + inputSourceWithToolbox.set(inputSource); + } + } + return inputSourceWithToolbox.get(); + } + /** * Returns {@link InputFormat}. Can be null if {@link DataSchema#parserMap} is specified. * Also can be null in {@link InputSourceSampler}. @@ -1211,11 +1231,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler return inputFormat; } - public InputSource getNonNullInputSource() - { - return Preconditions.checkNotNull(inputSource, "inputSource"); - } - public InputFormat getNonNullInputFormat() { return Preconditions.checkNotNull(inputFormat, "inputFormat"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java index bfdad011556..286325186b2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java @@ -58,7 +58,7 @@ abstract class InputSourceSplitParallelIndexTaskRunner e @Override public final TaskStatus runTask(TaskToolbox toolbox) throws Exception { - final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(); + InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox); final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build( supervisorTaskId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index 7f781e466ed..9ea8e7d5313 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -117,7 +117,7 @@ public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner context ); this.ingestionSchema = ingestionSchema; - this.baseInputSource = (SplittableInputSource) ingestionSchema.getIOConfig().getNonNullInputSource(); + this.baseInputSource = (SplittableInputSource) ingestionSchema.getIOConfig().getNonNullInputSource(toolbox); } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 6882414dc01..349e9974fb7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -270,7 +270,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() ); - final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(); + final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox); final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build( supervisorTaskId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 216b343babb..03d58cb96d9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -37,6 +37,7 @@ import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; @@ -46,8 +47,11 @@ import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Comparators; @@ -67,6 +71,7 @@ import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -82,12 +87,12 @@ import java.util.TreeMap; import java.util.stream.Stream; /** - * An {@link org.apache.druid.data.input.InputSource} that allows reading from Druid segments. + * An {@link InputSource} that allows reading from Druid segments. * * Used internally by {@link org.apache.druid.indexing.common.task.CompactionTask}, and can also be used directly. */ @JsonInclude(JsonInclude.Include.NON_NULL) -public class DruidInputSource extends AbstractInputSource implements SplittableInputSource> +public class DruidInputSource extends AbstractInputSource implements SplittableInputSource>, TaskInputSource { public static final String TYPE_KEY = "druid"; @@ -147,6 +152,9 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI */ private final List metrics; + @Nullable + private final TaskToolbox toolbox; + @JsonCreator public DruidInputSource( @JsonProperty("dataSource") final String dataSource, @@ -162,6 +170,35 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, @JacksonInject TaskConfig taskConfig ) + { + this( + dataSource, + interval, + segmentIds, + dimFilter, + dimensions, + metrics, + indexIO, + coordinatorClient, + segmentCacheManagerFactory, + taskConfig, + null + ); + } + + private DruidInputSource( + final String dataSource, + @Nullable Interval interval, + @Nullable List segmentIds, + DimFilter dimFilter, + List dimensions, + List metrics, + IndexIO indexIO, + CoordinatorClient coordinatorClient, + SegmentCacheManagerFactory segmentCacheManagerFactory, + TaskConfig taskConfig, + @Nullable TaskToolbox toolbox + ) { Preconditions.checkNotNull(dataSource, "dataSource"); if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) { @@ -177,6 +214,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory"); this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig"); + this.toolbox = toolbox; } @JsonIgnore @@ -231,6 +269,24 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI return metrics; } + @Override + public InputSource withTaskToolbox(TaskToolbox toolbox) + { + return new DruidInputSource( + this.dataSource, + this.interval, + this.segmentIds, + this.dimFilter, + this.dimensions, + this.metrics, + this.indexIO, + this.coordinatorClient, + this.segmentCacheManagerFactory, + this.taskConfig, + toolbox + ); + } + @Override protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) { @@ -307,7 +363,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI if (interval == null) { return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); } else { - return getTimelineForInterval(coordinatorClient, dataSource, interval); + return getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval); } } @@ -322,6 +378,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI if (segmentIds == null) { return Streams.sequentialStreamFrom( createSplits( + toolbox, coordinatorClient, dataSource, interval, @@ -341,6 +398,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI if (segmentIds == null) { return Iterators.size( createSplits( + toolbox, coordinatorClient, dataSource, interval, @@ -365,7 +423,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI indexIO, coordinatorClient, segmentCacheManagerFactory, - taskConfig + taskConfig, + toolbox ); } @@ -413,6 +472,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI } public static Iterator>> createSplits( + TaskToolbox toolbox, CoordinatorClient coordinatorClient, String dataSource, Interval interval, @@ -431,6 +491,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI } final List> timelineSegments = getTimelineForInterval( + toolbox, coordinatorClient, dataSource, interval @@ -474,18 +535,37 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI } public static List> getTimelineForInterval( + TaskToolbox toolbox, CoordinatorClient coordinatorClient, String dataSource, Interval interval ) { - final Collection usedSegments = FutureUtils.getUnchecked( - coordinatorClient.fetchUsedSegments( - dataSource, - Collections.singletonList(Preconditions.checkNotNull(interval, "interval")) - ), - true - ); + Preconditions.checkNotNull(interval); + + final Collection usedSegments; + if (toolbox == null) { + usedSegments = FutureUtils.getUnchecked( + coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)), + true + ); + } else { + try { + usedSegments = toolbox.getTaskActionClient() + .submit( + new RetrieveUsedSegmentsAction( + dataSource, + null, + Collections.singletonList(interval), + Segments.ONLY_VISIBLE + ) + ); + } + catch (IOException e) { + LOG.error(e, "Error retrieving the used segments for interval[%s].", interval); + throw new RuntimeException(e); + } + } return SegmentTimeline.forSegments(usedSegments).lookup(interval); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/TaskInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/TaskInputSource.java new file mode 100644 index 00000000000..08a2adf0bb2 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/TaskInputSource.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import org.apache.druid.data.input.InputSource; +import org.apache.druid.indexing.common.TaskToolbox; + +/** + * An InputSource that allows setting a {@link TaskToolbox} + * to be used for various purposes such as submitting task actions to the Overlord. + */ +public interface TaskInputSource extends InputSource +{ + /** + * Creates and returns a new {@code InputSource} which uses the given {@code TaskToolbox}. + */ + InputSource withTaskToolbox(TaskToolbox toolbox); +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 677b067f5cf..b49a223e718 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -756,6 +756,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis List>> splits = Lists.newArrayList( DruidInputSource.createSplits( + null, getCoordinatorClient(), DATA_SOURCE, INTERVAL_TO_INDEX,