diff --git a/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java index 42a16837b97..09f943bf367 100644 --- a/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java @@ -22,6 +22,7 @@ package org.apache.druid.data.input; import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.druid.data.input.impl.InputRowParser; +import javax.annotation.Nullable; import java.io.IOException; import java.util.stream.Stream; @@ -52,13 +53,13 @@ public interface FiniteFirehoseFactory extends Fire * lazily so that the listing overhead could be amortized. */ @JsonIgnore - Stream> getSplits() throws IOException; + Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException; /** - * Returns number of splits returned by {@link #getSplits()}. + * Returns number of splits returned by {@link #getSplits}. */ @JsonIgnore - int getNumSplits() throws IOException; + int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException; /** * Returns the same {@link FiniteFirehoseFactory} but with the given {@link InputSplit}. The returned diff --git a/core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java new file mode 100644 index 00000000000..6cca8ab7191 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * {@link SplitHintSpec} for IngestSegmentFirehoseFactory. + */ +public class SegmentsSplitHintSpec implements SplitHintSpec +{ + public static final String TYPE = "segments"; + + private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 500 * 1024 * 1024; + + /** + * Maximum number of bytes of input segments to process in a single task. + * If a single segment is larger than this number, it will be processed by itself in a single task. + */ + private final long maxInputSegmentBytesPerTask; + + @JsonCreator + public SegmentsSplitHintSpec( + @JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask + ) + { + this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null + ? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK + : maxInputSegmentBytesPerTask; + } + + @JsonProperty + public long getMaxInputSegmentBytesPerTask() + { + return maxInputSegmentBytesPerTask; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentsSplitHintSpec that = (SegmentsSplitHintSpec) o; + return maxInputSegmentBytesPerTask == that.maxInputSegmentBytesPerTask; + } + + @Override + public int hashCode() + { + return Objects.hash(maxInputSegmentBytesPerTask); + } + + @Override + public String toString() + { + return "SegmentsSplitHintSpec{" + + "maxInputSegmentBytesPerTask=" + maxInputSegmentBytesPerTask + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java new file mode 100644 index 00000000000..69042a74d92 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * In native parallel indexing, the supervisor task partitions input data into splits and assigns each of them + * to a single sub task. How to create splits could mainly depend on the input file format, but sometimes druid users + * want to give some hints to control the amount of data each sub task will read. SplitHintSpec can be used for this + * purpose. Implementations can ignore the given hint. + * + * @see FiniteFirehoseFactory#getSplits(SplitHintSpec) + * @see FiniteFirehoseFactory#getNumSplits(SplitHintSpec) + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = SegmentsSplitHintSpec.TYPE, value = SegmentsSplitHintSpec.class) +}) +public interface SplitHintSpec +{ +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java index ea6ac62c00b..4fe2e2884e5 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java @@ -26,8 +26,10 @@ import org.apache.commons.io.LineIterator; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -102,14 +104,14 @@ public abstract class AbstractTextFilesFirehoseFactory } @Override - public Stream> getSplits() throws IOException + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException { initializeObjectsIfNeeded(); return getObjects().stream().map(InputSplit::new); } @Override - public int getNumSplits() throws IOException + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException { initializeObjectsIfNeeded(); return getObjects().size(); diff --git a/docs/configuration/index.md b/docs/configuration/index.md index fefc9108c48..c94eee08676 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -813,9 +813,11 @@ If you see this problem, it's recommended to set `skipOffsetFromLatest` to some |`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1000000)| |`maxBytesInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (1/6 of max JVM memory)| |`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 20000000)| +|`splitHintSpec`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = null)| |`indexSpec`|See [IndexSpec](../ingestion/index.md#indexspec)|no| |`maxPendingPersists`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0 (meaning one persist can be running concurrently with ingestion, and none can be queued up))| |`pushTimeout`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0)| +|`maxNumConcurrentSubTasks`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1)| ### Overlord diff --git a/docs/ingestion/data-management.md b/docs/ingestion/data-management.md index 893c186a0a2..054e9f00ae5 100644 --- a/docs/ingestion/data-management.md +++ b/docs/ingestion/data-management.md @@ -100,9 +100,10 @@ Compaction tasks merge all segments of the given interval. The syntax is: "id": , "dataSource": , "ioConfig": , - "dimensions" , + "dimensionsSpec" , + "metricsSpec" , "segmentGranularity": , - "tuningConfig" , + "tuningConfig" , "context": } ``` @@ -116,7 +117,7 @@ Compaction tasks merge all segments of the given interval. The syntax is: |`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| |`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No| |`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No| -|`tuningConfig`|[Index task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No| +|`tuningConfig`|[Parallel indexing task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No| |`context`|[Task context](../ingestion/tasks.md#context)|No| diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 9da957708fc..90cc4c75d2d 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -204,6 +204,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| +|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of firehose. See [SplitHintSpec](#splithintspec) for more details.|null|no| |partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` if `forceGuaranteedRollup` = true|no| |indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no| |indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no| @@ -212,7 +213,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| |segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no| -|maxNumConcurrentSubTasks|Maximum number of tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no| +|maxNumConcurrentSubTasks|Maximum number of sub tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no| |maxRetry|Maximum number of retries on task failures.|3|no| |maxNumSegmentsToMerge|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only `forceGuaranteedRollup` is set.|100|no| |totalNumMergeTasks|Total number of tasks to merge segments in the second phase when `forceGuaranteedRollup` is set.|10|no| @@ -220,6 +221,22 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| |chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no| +### `splitHintSpec` + +`SplitHintSpec` is used to give a hint when the supervisor task creates input splits. +Note that each sub task processes a single input split. You can control the amount of data each sub task will read during the first phase. + +Currently only one splitHintSpec, i.e., `segments`, is available. + +#### `SegmentsSplitHintSpec` + +`SegmentsSplitHintSpec` is used only for `IngestSegmentFirehose`. + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should always be `segments`.|none|yes| +|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks).|150MB|no| + ### `partitionsSpec` PartitionsSpec is to describe the secondary partitioning method. @@ -785,7 +802,7 @@ This firehose will accept any type of parser, but will only utilize the list of |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](../querying/filters.md)|no| -|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| +|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java index 3c33bb7b48d..5f89860fbee 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java @@ -115,7 +115,7 @@ public class StaticS3FirehoseFactoryTest 5 ); final List> subFactories = factory - .getSplits() + .getSplits(null) .map(factory::withSplit) .sorted(Comparator.comparing(eachFactory -> { final StaticS3FirehoseFactory staticS3FirehoseFactory = (StaticS3FirehoseFactory) eachFactory; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 492c7125cd1..115ff8103b0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -31,6 +31,7 @@ import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -52,9 +53,10 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentListUsedAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; -import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; -import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; -import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -129,7 +131,7 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private final Granularity segmentGranularity; @Nullable - private final IndexTuningConfig tuningConfig; + private final ParallelIndexTuningConfig tuningConfig; private final ObjectMapper jsonMapper; @JsonIgnore private final SegmentProvider segmentProvider; @@ -148,6 +150,8 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonIgnore private final CoordinatorClient coordinatorClient; + private final IndexingServiceClient indexingServiceClient; + @JsonIgnore private final SegmentLoaderFactory segmentLoaderFactory; @@ -160,14 +164,11 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonIgnore private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( (taskObject, config) -> { - final IndexTask indexTask = (IndexTask) taskObject; + final ParallelIndexSupervisorTask indexTask = (ParallelIndexSupervisorTask) taskObject; indexTask.stopGracefully(config); } ); - @JsonIgnore - private List indexTaskSpecs; - @JsonCreator public CompactionTask( @JsonProperty("id") final String id, @@ -180,13 +181,14 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, @JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity, - @JsonProperty("tuningConfig") @Nullable final IndexTuningConfig tuningConfig, + @JsonProperty("tuningConfig") @Nullable final ParallelIndexTuningConfig tuningConfig, @JsonProperty("context") @Nullable final Map context, @JacksonInject ObjectMapper jsonMapper, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @JacksonInject RetryPolicyFactory retryPolicyFactory, @JacksonInject AppenderatorsManager appenderatorsManager @@ -222,6 +224,7 @@ public class CompactionTask extends AbstractBatchIndexTask this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.indexingServiceClient = indexingServiceClient; this.coordinatorClient = coordinatorClient; this.segmentLoaderFactory = segmentLoaderFactory; this.retryPolicyFactory = retryPolicyFactory; @@ -258,7 +261,7 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable @JsonProperty - public IndexTuningConfig getTuningConfig() + public ParallelIndexTuningConfig getTuningConfig() { return tuningConfig; } @@ -304,36 +307,36 @@ public class CompactionTask extends AbstractBatchIndexTask @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { - if (indexTaskSpecs == null) { - final List ingestionSpecs = createIngestionSchema( - toolbox, - segmentProvider, - partitionConfigurationManager, - dimensionsSpec, - metricsSpec, - segmentGranularity, - jsonMapper, - coordinatorClient, - segmentLoaderFactory, - retryPolicyFactory - ); - indexTaskSpecs = IntStream - .range(0, ingestionSpecs.size()) - .mapToObj(i -> new IndexTask( - createIndexTaskSpecId(i), - getGroupId(), - getTaskResource(), - getDataSource(), - ingestionSpecs.get(i), - createContextForSubtask(), - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory, - appenderatorsManager - - )) - .collect(Collectors.toList()); - } + final List ingestionSpecs = createIngestionSchema( + toolbox, + segmentProvider, + partitionConfigurationManager, + dimensionsSpec, + metricsSpec, + segmentGranularity, + jsonMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ); + final List indexTaskSpecs = IntStream + .range(0, ingestionSpecs.size()) + .mapToObj(i -> { + // taskId is used for different purposes in parallel indexing and local indexing. + // In parallel indexing, it's the taskId of the supervisor task. This supervisor taskId must be + // a valid taskId to communicate with sub tasks properly. We use the ID of the compaction task in this case. + // + // In local indexing, it's used as the sequence name for Appenderator. Even though a compaction task can run + // multiple index tasks (one per interval), the appenderator is not shared by those tasks. Each task creates + // a new Appenderator on its own instead. As a result, they should use different sequence names to allocate + // new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details. + // In this case, we use different fake IDs for each created index task. + final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1 + ? createIndexTaskSpecId(i) + : getId(); + return newTask(subtaskId, ingestionSpecs.get(i)); + }) + .collect(Collectors.toList()); if (indexTaskSpecs.isEmpty()) { log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec()); @@ -344,7 +347,7 @@ public class CompactionTask extends AbstractBatchIndexTask log.info("Generated [%d] compaction task specs", totalNumSpecs); int failCnt = 0; - for (IndexTask eachSpec : indexTaskSpecs) { + for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) { final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); if (!currentSubTaskHolder.setTask(eachSpec)) { log.info("Task is asked to stop. Finish as failed."); @@ -374,10 +377,30 @@ public class CompactionTask extends AbstractBatchIndexTask } } - private Map createContextForSubtask() + @VisibleForTesting + ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec) + { + return new ParallelIndexSupervisorTask( + taskId, + getGroupId(), + getTaskResource(), + ingestionSpec, + createContextForSubtask(), + indexingServiceClient, + chatHandlerProvider, + authorizerMapper, + rowIngestionMetersFactory, + appenderatorsManager + ); + } + + @VisibleForTesting + Map createContextForSubtask() { final Map newContext = new HashMap<>(getContext()); newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId()); + // Set the priority of the compaction task. + newContext.put(Tasks.PRIORITY_KEY, getPriority()); return newContext; } @@ -387,12 +410,12 @@ public class CompactionTask extends AbstractBatchIndexTask } /** - * Generate {@link IndexIngestionSpec} from input segments. + * Generate {@link ParallelIndexIngestionSpec} from input segments. * * @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec. */ @VisibleForTesting - static List createIngestionSchema( + static List createIngestionSchema( final TaskToolbox toolbox, final SegmentProvider segmentProvider, final PartitionConfigurationManager partitionConfigurationManager, @@ -424,7 +447,7 @@ public class CompactionTask extends AbstractBatchIndexTask toolbox.getIndexIO() ); - final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(); + final ParallelIndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(); if (segmentGranularity == null) { // original granularity @@ -437,7 +460,7 @@ public class CompactionTask extends AbstractBatchIndexTask .add(p) ); - final List specs = new ArrayList<>(intervalToSegments.size()); + final List specs = new ArrayList<>(intervalToSegments.size()); for (Entry>> entry : intervalToSegments.entrySet()) { final Interval interval = entry.getKey(); final List> segmentsToCompact = entry.getValue(); @@ -451,7 +474,7 @@ public class CompactionTask extends AbstractBatchIndexTask ); specs.add( - new IndexIngestionSpec( + new ParallelIndexIngestionSpec( dataSchema, createIoConfig( toolbox, @@ -479,7 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask ); return Collections.singletonList( - new IndexIngestionSpec( + new ParallelIndexIngestionSpec( dataSchema, createIoConfig( toolbox, @@ -495,7 +518,7 @@ public class CompactionTask extends AbstractBatchIndexTask } } - private static IndexIOConfig createIoConfig( + private static ParallelIndexIOConfig createIoConfig( TaskToolbox toolbox, DataSchema dataSchema, Interval interval, @@ -504,7 +527,7 @@ public class CompactionTask extends AbstractBatchIndexTask RetryPolicyFactory retryPolicyFactory ) { - return new IndexIOConfig( + return new ParallelIndexIOConfig( new IngestSegmentFirehoseFactory( dataSchema.getDataSource(), interval, @@ -787,18 +810,18 @@ public class CompactionTask extends AbstractBatchIndexTask static class PartitionConfigurationManager { @Nullable - private final IndexTuningConfig tuningConfig; + private final ParallelIndexTuningConfig tuningConfig; - PartitionConfigurationManager(@Nullable IndexTuningConfig tuningConfig) + PartitionConfigurationManager(@Nullable ParallelIndexTuningConfig tuningConfig) { this.tuningConfig = tuningConfig; } @Nullable - IndexTuningConfig computeTuningConfig() + ParallelIndexTuningConfig computeTuningConfig() { - IndexTuningConfig newTuningConfig = tuningConfig == null - ? IndexTuningConfig.createDefault() + ParallelIndexTuningConfig newTuningConfig = tuningConfig == null + ? ParallelIndexTuningConfig.defaultConfig() : tuningConfig; PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec(); if (partitionsSpec instanceof DynamicPartitionsSpec) { @@ -822,6 +845,7 @@ public class CompactionTask extends AbstractBatchIndexTask private final AuthorizerMapper authorizerMapper; private final ChatHandlerProvider chatHandlerProvider; private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final IndexingServiceClient indexingServiceClient; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; private final RetryPolicyFactory retryPolicyFactory; @@ -835,7 +859,7 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private Granularity segmentGranularity; @Nullable - private IndexTuningConfig tuningConfig; + private ParallelIndexTuningConfig tuningConfig; @Nullable private Map context; @@ -845,6 +869,7 @@ public class CompactionTask extends AbstractBatchIndexTask AuthorizerMapper authorizerMapper, ChatHandlerProvider chatHandlerProvider, RowIngestionMetersFactory rowIngestionMetersFactory, + IndexingServiceClient indexingServiceClient, CoordinatorClient coordinatorClient, SegmentLoaderFactory segmentLoaderFactory, RetryPolicyFactory retryPolicyFactory, @@ -856,6 +881,7 @@ public class CompactionTask extends AbstractBatchIndexTask this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.indexingServiceClient = indexingServiceClient; this.coordinatorClient = coordinatorClient; this.segmentLoaderFactory = segmentLoaderFactory; this.retryPolicyFactory = retryPolicyFactory; @@ -896,7 +922,7 @@ public class CompactionTask extends AbstractBatchIndexTask return this; } - public Builder tuningConfig(IndexTuningConfig tuningConfig) + public Builder tuningConfig(ParallelIndexTuningConfig tuningConfig) { this.tuningConfig = tuningConfig; return this; @@ -928,6 +954,7 @@ public class CompactionTask extends AbstractBatchIndexTask chatHandlerProvider, rowIngestionMetersFactory, coordinatorClient, + indexingServiceClient, segmentLoaderFactory, retryPolicyFactory, appenderatorsManager diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index 7f5697a9b6e..e3a7632b48a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -28,6 +28,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskToolbox; @@ -196,7 +197,10 @@ public abstract class ParallelIndexPhaseRunner getSplitsIfSplittable(FirehoseFactory firehoseFactory) throws IOException + private static List getSplitsIfSplittable( + FirehoseFactory firehoseFactory, + @Nullable SplitHintSpec splitHintSpec + ) throws IOException { if (firehoseFactory instanceof FiniteFirehoseFactory) { final FiniteFirehoseFactory finiteFirehoseFactory = (FiniteFirehoseFactory) firehoseFactory; - return finiteFirehoseFactory.getSplits().collect(Collectors.toList()); + return finiteFirehoseFactory.getSplits(splitHintSpec).collect(Collectors.toList()); } else { throw new ISE("firehoseFactory[%s] is not splittable", firehoseFactory.getClass().getSimpleName()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 7f593aeddca..265d38dd605 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -158,6 +158,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @JsonCreator public ParallelIndexSupervisorTask( @JsonProperty("id") String id, + @JsonProperty("groupId") @Nullable String groupId, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema, @JsonProperty("context") Map context, @@ -170,7 +171,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen { super( getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()), - null, + groupId, taskResource, ingestionSchema.getDataSchema().getDataSource(), context @@ -396,10 +397,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen "firehoseFactory[%s] is not splittable. Running sequentially.", baseFirehoseFactory.getClass().getSimpleName() ); - } else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() == 1) { + } else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) { LOG.warn( - "maxNumConcurrentSubTasks is 1. Running sequentially. " - + "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode." + "maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. " + + "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel " + + "ingestion mode.", + ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() ); } else { throw new ISE("Unknown reason for sequentail mode. Failing this task."); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 64159d9f3f4..552c4048a80 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.java.util.common.IAE; @@ -46,6 +47,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig private static final int DEFAULT_MAX_NUM_SEGMENTS_TO_MERGE = 100; private static final int DEFAULT_TOTAL_NUM_MERGE_TASKS = 10; + private final SplitHintSpec splitHintSpec; + private final int maxNumConcurrentSubTasks; private final int maxRetry; private final long taskStatusCheckPeriodMs; @@ -67,7 +70,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig */ private final int totalNumMergeTasks; - static ParallelIndexTuningConfig defaultConfig() + public static ParallelIndexTuningConfig defaultConfig() { return new ParallelIndexTuningConfig( null, @@ -94,6 +97,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig null, null, null, + null, null ); } @@ -106,6 +110,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @JsonProperty("numShards") @Deprecated @Nullable Integer numShards, + @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec, @JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @@ -117,7 +122,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig @JsonProperty("maxNumSubTasks") @Deprecated @Nullable Integer maxNumSubTasks, @JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks, @JsonProperty("maxRetry") @Nullable Integer maxRetry, - @JsonProperty("taskStatusCheckPeriodMs") @Nullable Integer taskStatusCheckPeriodMs, + @JsonProperty("taskStatusCheckPeriodMs") @Nullable Long taskStatusCheckPeriodMs, @JsonProperty("chatHandlerTimeout") @Nullable Duration chatHandlerTimeout, @JsonProperty("chatHandlerNumRetries") @Nullable Integer chatHandlerNumRetries, @JsonProperty("maxNumSegmentsToMerge") @Nullable Integer maxNumSegmentsToMerge, @@ -154,6 +159,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig throw new IAE("Can't use both maxNumSubTasks and maxNumConcurrentSubTasks. Use maxNumConcurrentSubTasks instead"); } + this.splitHintSpec = splitHintSpec; + if (maxNumConcurrentSubTasks == null) { this.maxNumConcurrentSubTasks = maxNumSubTasks == null ? DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS : maxNumSubTasks; } else { @@ -182,6 +189,13 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig Preconditions.checkArgument(this.totalNumMergeTasks > 0, "totalNumMergeTasks must be positive"); } + @Nullable + @JsonProperty + public SplitHintSpec getSplitHintSpec() + { + return splitHintSpec; + } + @JsonProperty public int getMaxNumConcurrentSubTasks() { @@ -224,6 +238,39 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig return totalNumMergeTasks; } + @Override + public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) + { + return new ParallelIndexTuningConfig( + null, + null, + getMaxRowsInMemory(), + getMaxBytesInMemory(), + null, + null, + getSplitHintSpec(), + partitionsSpec, + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + getMaxPendingPersists(), + isForceGuaranteedRollup(), + isReportParseExceptions(), + getPushTimeout(), + getSegmentWriteOutMediumFactory(), + null, + getMaxNumConcurrentSubTasks(), + getMaxRetry(), + getTaskStatusCheckPeriodMs(), + getChatHandlerTimeout(), + getChatHandlerNumRetries(), + getMaxNumSegmentsToMerge(), + getTotalNumMergeTasks(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); + } + @Override public boolean equals(Object o) { @@ -243,6 +290,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig chatHandlerNumRetries == that.chatHandlerNumRetries && maxNumSegmentsToMerge == that.maxNumSegmentsToMerge && totalNumMergeTasks == that.totalNumMergeTasks && + Objects.equals(splitHintSpec, that.splitHintSpec) && Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout); } @@ -251,6 +299,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig { return Objects.hash( super.hashCode(), + splitHintSpec, maxNumConcurrentSubTasks, maxRetry, taskStatusCheckPeriodMs, @@ -260,4 +309,19 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig totalNumMergeTasks ); } + + @Override + public String toString() + { + return "ParallelIndexTuningConfig{" + + "splitHintSpec=" + splitHintSpec + + ", maxNumConcurrentSubTasks=" + maxNumConcurrentSubTasks + + ", maxRetry=" + maxRetry + + ", taskStatusCheckPeriodMs=" + taskStatusCheckPeriodMs + + ", chatHandlerTimeout=" + chatHandlerTimeout + + ", chatHandlerNumRetries=" + chatHandlerNumRetries + + ", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge + + ", totalNumMergeTasks=" + totalNumMergeTasks + + "} " + super.toString(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java index 35061a1df52..edb00aa100d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java @@ -70,13 +70,13 @@ class PartialSegmentGenerateParallelIndexTaskRunner @Override Iterator> subTaskSpecIterator() throws IOException { - return baseFirehoseFactory.getSplits().map(this::newTaskSpec).iterator(); + return baseFirehoseFactory.getSplits(getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator(); } @Override int getTotalNumSubTasks() throws IOException { - return baseFirehoseFactory.getNumSplits(); + return baseFirehoseFactory.getNumSplits(getTuningConfig().getSplitHintSpec()); } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index 9139e797def..302f3fa6db1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -79,13 +79,13 @@ class SinglePhaseParallelIndexTaskRunner @Override Iterator> subTaskSpecIterator() throws IOException { - return baseFirehoseFactory.getSplits().map(this::newTaskSpec).iterator(); + return baseFirehoseFactory.getSplits(getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator(); } @Override int getTotalNumSubTasks() throws IOException { - return baseFirehoseFactory.getNumSplits(); + return baseFirehoseFactory.getNumSplits(getTuningConfig().getSplitHintSpec()); } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 53969484128..c8211511fd0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -34,6 +34,8 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.RetryPolicy; import org.apache.druid.indexing.common.RetryPolicyFactory; @@ -78,7 +80,6 @@ import java.util.stream.Stream; public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory> { private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); - private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; private final String dataSource; // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel @@ -90,7 +91,8 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory dimensions; private final List metrics; - private final long maxInputSegmentBytesPerTask; + @Nullable + private final Long maxInputSegmentBytesPerTask; private final IndexIO indexIO; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; @@ -108,7 +110,7 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory dimensions, @JsonProperty("metrics") List metrics, - @JsonProperty("maxInputSegmentBytesPerTask") Long maxInputSegmentBytesPerTask, + @JsonProperty("maxInputSegmentBytesPerTask") @Deprecated @Nullable Long maxInputSegmentBytesPerTask, @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @@ -125,9 +127,7 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory(timeline.values()); } - private void initializeSplitsIfNeeded() + private void initializeSplitsIfNeeded(@Nullable SplitHintSpec splitHintSpec) { if (splits != null) { return; } + final SegmentsSplitHintSpec nonNullSplitHintSpec; + if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) { + if (splitHintSpec != null) { + log.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ingoring it.", splitHintSpec); + } + nonNullSplitHintSpec = new SegmentsSplitHintSpec(null); + } else { + nonNullSplitHintSpec = (SegmentsSplitHintSpec) splitHintSpec; + } + + final long maxInputSegmentBytesPerTask = this.maxInputSegmentBytesPerTask == null + ? nonNullSplitHintSpec.getMaxInputSegmentBytesPerTask() + : this.maxInputSegmentBytesPerTask; + // isSplittable() ensures this is only called when we have an interval. final List> timelineSegments = getTimelineForInterval(); @@ -456,16 +471,16 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory>> getSplits() + public Stream>> getSplits(@Nullable SplitHintSpec splitHintSpec) { - initializeSplitsIfNeeded(); + initializeSplitsIfNeeded(splitHintSpec); return splits.stream(); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { - initializeSplitsIfNeeded(); + initializeSplitsIfNeeded(splitHintSpec); return splits.size(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java index 079fb315508..a4ac7672b25 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java @@ -27,6 +27,9 @@ import org.apache.druid.client.indexing.ClientCompactQuery; import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; @@ -74,6 +77,7 @@ public class ClientCompactQuerySerdeTest 40000, 2000L, 30000L, + new SegmentsSplitHintSpec(100000L), new IndexSpec( new DefaultBitmapSerdeFactory(), CompressionStrategy.LZ4, @@ -81,7 +85,8 @@ public class ClientCompactQuerySerdeTest LongEncodingStrategy.LONGS ), null, - 1000L + 1000L, + 100 ), new HashMap<>() ); @@ -100,22 +105,36 @@ public class ClientCompactQuerySerdeTest ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds() ); Assert.assertEquals( - query.getTuningConfig().getMaxRowsInMemory().intValue(), task.getTuningConfig().getMaxRowsInMemory() + query.getTuningConfig().getMaxRowsInMemory().intValue(), + task.getTuningConfig().getMaxRowsInMemory() ); Assert.assertEquals( - query.getTuningConfig().getMaxBytesInMemory().longValue(), task.getTuningConfig().getMaxBytesInMemory() + query.getTuningConfig().getMaxBytesInMemory().longValue(), + task.getTuningConfig().getMaxBytesInMemory() ); Assert.assertEquals( - query.getTuningConfig().getMaxRowsPerSegment(), task.getTuningConfig().getMaxRowsPerSegment() + query.getTuningConfig().getMaxRowsPerSegment(), + task.getTuningConfig().getMaxRowsPerSegment() ); Assert.assertEquals( - query.getTuningConfig().getMaxTotalRows(), task.getTuningConfig().getMaxTotalRows() + query.getTuningConfig().getMaxTotalRows(), + task.getTuningConfig().getMaxTotalRows() ); Assert.assertEquals( - query.getTuningConfig().getIndexSpec(), task.getTuningConfig().getIndexSpec() + query.getTuningConfig().getSplitHintSpec(), + task.getTuningConfig().getSplitHintSpec() ); Assert.assertEquals( - query.getTuningConfig().getPushTimeout().longValue(), task.getTuningConfig().getPushTimeout() + query.getTuningConfig().getIndexSpec(), + task.getTuningConfig().getIndexSpec() + ); + Assert.assertEquals( + query.getTuningConfig().getPushTimeout().longValue(), + task.getTuningConfig().getPushTimeout() + ); + Assert.assertEquals( + query.getTuningConfig().getMaxNumConcurrentSubTasks().intValue(), + task.getTuningConfig().getMaxNumConcurrentSubTasks() ); Assert.assertEquals(query.getContext(), task.getContext()); } @@ -143,6 +162,7 @@ public class ClientCompactQuerySerdeTest binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + binder.bind(IndexingServiceClient.class).toInstance(new NoopIndexingServiceClient()); } ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java new file mode 100644 index 00000000000..330fed26701 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingTest.TestSupervisorTask; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervisorTaskTest +{ + @Parameterized.Parameters(name = "{0}") + public static Iterable constructorFeeder() + { + return ImmutableList.of( + new Object[]{LockGranularity.TIME_CHUNK}, + new Object[]{LockGranularity.SEGMENT} + ); + } + + private static final String DATA_SOURCE = "test"; + private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); + + private final AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager(); + private final LockGranularity lockGranularity; + private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final CoordinatorClient coordinatorClient; + + public CompactionTaskParallelRunTest(LockGranularity lockGranularity) + { + this.lockGranularity = lockGranularity; + this.rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); + coordinatorClient = new CoordinatorClient(null, null) + { + @Override + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + return getStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals); + } + + @Override + public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId) + { + ImmutableDruidDataSource druidDataSource = getMetadataSegmentManager().getImmutableDataSourceWithUsedSegments( + dataSource + ); + if (druidDataSource == null) { + throw new ISE("Unknown datasource[%s]", dataSource); + } + + for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource(dataSource, segmentId)) { + DataSegment segment = druidDataSource.getSegment(possibleSegmentId); + if (segment != null) { + return segment; + } + } + throw new ISE("Can't find segment for id[%s]", segmentId); + } + }; + } + + @Before + public void setup() throws IOException + { + indexingServiceClient = new LocalIndexingServiceClient(); + localDeepStorage = temporaryFolder.newFolder(); + } + + @After + public void teardown() + { + indexingServiceClient.shutdown(); + temporaryFolder.delete(); + } + + @Test + public void testRunParallel() throws Exception + { + runIndexTask(); + + final CompactionTask compactionTask = new TestCompactionTask( + null, + null, + DATA_SOURCE, + new CompactionIOConfig(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null)), + null, + null, + null, + null, + newTuningConfig(), + null, + getObjectMapper(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, + coordinatorClient, + indexingServiceClient, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY, + appenderatorsManager + ); + + runTask(compactionTask); + } + + private void runIndexTask() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T00:00:10Z,b,2\n"); + writer.write("2014-01-01T00:00:10Z,c,3\n"); + writer.write("2014-01-01T01:00:20Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,2\n"); + writer.write("2014-01-01T01:00:20Z,c,3\n"); + writer.write("2014-01-01T02:00:30Z,a,1\n"); + writer.write("2014-01-01T02:00:30Z,b,2\n"); + writer.write("2014-01-01T02:00:30Z,c,3\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + IndexTaskTest.createIngestionSpec( + getObjectMapper(), + tmpDir, + CompactionTaskRunTest.DEFAULT_PARSE_SPEC, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, null, false, true), + false + ), + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, + appenderatorsManager + ); + + runTask(indexTask); + } + + private void runTask(Task task) throws Exception + { + actionClient = createActionClient(task); + toolbox = createTaskToolbox(task); + prepareTaskForLocking(task); + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + Assert.assertTrue(task.isReady(actionClient)); + Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode()); + shutdownTask(task); + } + + private static ParallelIndexTuningConfig newTuningConfig() + { + return new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 2, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + + private static class TestCompactionTask extends CompactionTask + { + private final IndexingServiceClient indexingServiceClient; + + TestCompactionTask( + String id, + TaskResource taskResource, + String dataSource, + @Nullable CompactionIOConfig ioConfig, + @Nullable DimensionsSpec dimensions, + @Nullable DimensionsSpec dimensionsSpec, + @Nullable AggregatorFactory[] metricsSpec, + @Nullable Granularity segmentGranularity, + @Nullable ParallelIndexTuningConfig tuningConfig, + @Nullable Map context, + ObjectMapper jsonMapper, + AuthorizerMapper authorizerMapper, + ChatHandlerProvider chatHandlerProvider, + RowIngestionMetersFactory rowIngestionMetersFactory, + CoordinatorClient coordinatorClient, + @Nullable IndexingServiceClient indexingServiceClient, + SegmentLoaderFactory segmentLoaderFactory, + RetryPolicyFactory retryPolicyFactory, + AppenderatorsManager appenderatorsManager + ) + { + super( + id, + taskResource, + dataSource, + null, + null, + ioConfig, + dimensions, + dimensionsSpec, + metricsSpec, + segmentGranularity, + tuningConfig, + context, + jsonMapper, + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory, + coordinatorClient, + indexingServiceClient, + segmentLoaderFactory, + retryPolicyFactory, + appenderatorsManager + ); + this.indexingServiceClient = indexingServiceClient; + } + + @Override + ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec) + { + return new TestSupervisorTask( + taskId, + null, + ingestionSpec, + createContextForSubtask(), + indexingServiceClient + ); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index fd0e9aa5263..9a7fa51bad8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; @@ -56,6 +58,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -76,6 +79,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; import java.io.BufferedWriter; import java.io.File; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -90,15 +94,13 @@ import java.util.concurrent.Future; @RunWith(Parameterized.class) public class CompactionTaskRunTest extends IngestionTestBase { - public static final String DATA_SOURCE = "test"; - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( + public static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( new TimestampSpec( "ts", "auto", @@ -137,18 +139,23 @@ public class CompactionTaskRunTest extends IngestionTestBase ); } + private static final String DATA_SOURCE = "test"; private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final IndexingServiceClient indexingServiceClient; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; private final LockGranularity lockGranularity; + private final AppenderatorsManager appenderatorsManager; + private ExecutorService exec; - private AppenderatorsManager appenderatorsManager; + private File localDeepStorage; public CompactionTaskRunTest(LockGranularity lockGranularity) { TestUtils testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); + indexingServiceClient = new NoopIndexingServiceClient(); coordinatorClient = new CoordinatorClient(null, null) { @Override @@ -163,15 +170,17 @@ public class CompactionTaskRunTest extends IngestionTestBase } @Before - public void setup() + public void setup() throws IOException { exec = Execs.multiThreaded(2, "compaction-task-run-test-%d"); + localDeepStorage = temporaryFolder.newFolder(); } @After public void teardown() { exec.shutdownNow(); + temporaryFolder.delete(); } @Test @@ -183,8 +192,9 @@ public class CompactionTaskRunTest extends IngestionTestBase DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -228,8 +238,9 @@ public class CompactionTaskRunTest extends IngestionTestBase DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -306,8 +317,9 @@ public class CompactionTaskRunTest extends IngestionTestBase DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -350,7 +362,7 @@ public class CompactionTaskRunTest extends IngestionTestBase ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, appenderatorsManager ); @@ -404,8 +416,9 @@ public class CompactionTaskRunTest extends IngestionTestBase DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -459,8 +472,9 @@ public class CompactionTaskRunTest extends IngestionTestBase DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -506,8 +520,9 @@ public class CompactionTaskRunTest extends IngestionTestBase DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -564,8 +579,9 @@ public class CompactionTaskRunTest extends IngestionTestBase DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -675,7 +691,7 @@ public class CompactionTaskRunTest extends IngestionTestBase ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, appenderatorsManager ); @@ -696,31 +712,54 @@ public class CompactionTaskRunTest extends IngestionTestBase { getLockbox().add(task); getTaskStorage().insert(task, TaskStatus.running(task.getId())); - final TestLocalTaskActionClient actionClient = createActionClient(task); - final File deepStorageDir = temporaryFolder.newFolder(); final ObjectMapper objectMapper = getObjectMapper(); objectMapper.registerSubtypes( new NamedType(LocalLoadSpec.class, "local") ); objectMapper.registerSubtypes(LocalDataSegmentPuller.class); + final TaskToolbox box = createTaskToolbox(objectMapper, task); + + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true); + if (task.isReady(box.getTaskActionClient())) { + if (readyLatchToCountDown != null) { + readyLatchToCountDown.countDown(); + } + if (latchToAwaitBeforeRun != null) { + latchToAwaitBeforeRun.await(); + } + TaskStatus status = task.run(box); + shutdownTask(task); + final List segments = new ArrayList<>( + ((TestLocalTaskActionClient) box.getTaskActionClient()).getPublishedSegments() + ); + Collections.sort(segments); + return Pair.of(status, segments); + } else { + throw new ISE("task[%s] is not ready", task.getId()); + } + } + + private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException + { final SegmentLoader loader = new SegmentLoaderLocalCacheManager( getIndexIO(), new SegmentLoaderConfig() { @Override public List getLocations() { - return ImmutableList.of(new StorageLocationConfig(deepStorageDir, null, null)); + return ImmutableList.of(new StorageLocationConfig(localDeepStorage, null, null)); } }, objectMapper ); - final TaskToolbox box = new TaskToolbox( + return new TaskToolbox( null, null, - actionClient, + createActionClient(task), null, new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()), new NoopDataSegmentKiller(), @@ -747,23 +786,5 @@ public class CompactionTaskRunTest extends IngestionTestBase new NoopTestTaskReportFileWriter(), null ); - - task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); - task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true); - if (task.isReady(box.getTaskActionClient())) { - if (readyLatchToCountDown != null) { - readyLatchToCountDown.countDown(); - } - if (latchToAwaitBeforeRun != null) { - latchToAwaitBeforeRun.await(); - } - TaskStatus status = task.run(box); - shutdownTask(task); - final List segments = new ArrayList<>(actionClient.getPublishedSegments()); - Collections.sort(segments); - return Pair.of(status, segments); - } else { - throw new ISE("task[%s] is not ready", task.getId()); - } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f8fe11a53a9..3253fdc4f67 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -29,6 +29,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -55,9 +57,9 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.CompactionTask.Builder; import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager; import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider; -import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; -import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; -import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; @@ -148,12 +150,13 @@ public class CompactionTaskTest Intervals.of("2017-06-01/2017-07-01") ); private static final Map MIXED_TYPE_COLUMN_MAP = new HashMap<>(); - private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig(); + private static final ParallelIndexTuningConfig TUNING_CONFIG = createTuningConfig(); private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils() .getRowIngestionMetersFactory(); private static final Map SEGMENT_MAP = new HashMap<>(); private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP); + private static IndexingServiceClient INDEXING_SERVICE_CLIENT = new NoopIndexingServiceClient(); private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager(); private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper()); private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); @@ -250,6 +253,7 @@ public class CompactionTaskTest binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT); } ) ) @@ -277,9 +281,9 @@ public class CompactionTaskTest return dimensions; } - private static IndexTuningConfig createTuningConfig() + private static ParallelIndexTuningConfig createTuningConfig() { - return new IndexTuningConfig( + return new ParallelIndexTuningConfig( null, null, // null to compute maxRowsPerSegment automatically 500000, @@ -288,7 +292,6 @@ public class CompactionTaskTest null, null, null, - null, new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -296,11 +299,18 @@ public class CompactionTaskTest LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, + 5000L, + null, + null, + null, + null, + null, + null, + null, null, - 100L, null, null, null, @@ -332,6 +342,7 @@ public class CompactionTaskTest AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -359,6 +370,7 @@ public class CompactionTaskTest AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -384,6 +396,7 @@ public class CompactionTaskTest AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -424,7 +437,7 @@ public class CompactionTaskTest @Test public void testCreateIngestionSchema() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -457,8 +470,8 @@ public class CompactionTaskTest @Test public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException { - final IndexTuningConfig tuningConfig = new IndexTuningConfig( - null, + final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + 100000, null, 500000, 1000000L, @@ -466,7 +479,6 @@ public class CompactionTaskTest null, null, null, - new HashedPartitionsSpec(6, null, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -474,17 +486,24 @@ public class CompactionTaskTest LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, null, - 100L, + null, + null, + 10, + null, + null, + null, + null, + null, null, null, null, null ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), @@ -518,16 +537,15 @@ public class CompactionTaskTest @Test public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException { - final IndexTuningConfig tuningConfig = new IndexTuningConfig( + final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( null, null, 500000, 1000000L, + 1000000L, null, null, null, - null, - new HashedPartitionsSpec(null, 6, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -535,17 +553,24 @@ public class CompactionTaskTest LongEncodingStrategy.LONGS ), null, - 5000, - true, - false, null, - 100L, + false, + false, + 5000L, + null, + null, + null, + null, + null, + null, + null, + null, null, null, null, null ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), @@ -579,7 +604,7 @@ public class CompactionTaskTest @Test public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException { - final IndexTuningConfig tuningConfig = new IndexTuningConfig( + final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( null, null, 500000, @@ -587,7 +612,6 @@ public class CompactionTaskTest null, null, null, - null, new HashedPartitionsSpec(null, 3, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -596,17 +620,24 @@ public class CompactionTaskTest LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, + 5000L, + null, + null, + 10, + null, + null, + null, + null, null, - 100L, null, null, null, null ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), @@ -667,7 +698,7 @@ public class CompactionTaskTest ) ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -708,7 +739,7 @@ public class CompactionTaskTest new DoubleMaxAggregatorFactory("custom_double_max", "agg_4") }; - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -742,7 +773,7 @@ public class CompactionTaskTest @Test public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -831,6 +862,7 @@ public class CompactionTaskTest AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -845,7 +877,7 @@ public class CompactionTaskTest @Test public void testSegmentGranularity() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -880,7 +912,7 @@ public class CompactionTaskTest @Test public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -955,7 +987,7 @@ public class CompactionTaskTest } private void assertIngestionSchema( - List ingestionSchemas, + List ingestionSchemas, List expectedDimensionsSpecs, List expectedMetricsSpec, List expectedSegmentIntervals, @@ -967,7 +999,7 @@ public class CompactionTaskTest expectedDimensionsSpecs, expectedMetricsSpec, expectedSegmentIntervals, - new IndexTuningConfig( + new ParallelIndexTuningConfig( null, null, 500000, @@ -975,8 +1007,7 @@ public class CompactionTaskTest Long.MAX_VALUE, null, null, - null, - new HashedPartitionsSpec(null, null, null), // automatically computed targetPartitionSize + new HashedPartitionsSpec(5000000, null, null), // automatically computed targetPartitionSize new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -984,11 +1015,18 @@ public class CompactionTaskTest LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, + 5000L, + null, + null, + null, + null, + null, + null, + null, null, - 100L, null, null, null, @@ -999,11 +1037,11 @@ public class CompactionTaskTest } private void assertIngestionSchema( - List ingestionSchemas, + List ingestionSchemas, List expectedDimensionsSpecs, List expectedMetricsSpec, List expectedSegmentIntervals, - IndexTuningConfig expectedTuningConfig, + ParallelIndexTuningConfig expectedTuningConfig, Granularity expectedSegmentGranularity ) { @@ -1015,7 +1053,7 @@ public class CompactionTaskTest ); for (int i = 0; i < ingestionSchemas.size(); i++) { - final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i); + final ParallelIndexIngestionSpec ingestionSchema = ingestionSchemas.get(i); final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i); // assert dataSchema @@ -1048,7 +1086,7 @@ public class CompactionTaskTest ); // assert ioConfig - final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); + final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig(); Assert.assertFalse(ioConfig.isAppendToExisting()); final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory(); Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index f49a90f27b1..dcaffb395de 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -24,6 +24,7 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; @@ -58,6 +59,7 @@ import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.NoopDataSegmentKiller; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.server.DruidNode; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -86,6 +88,7 @@ public abstract class IngestionTestBase private final TestUtils testUtils = new TestUtils(); private final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + private SegmentLoaderFactory segmentLoaderFactory; private TaskStorage taskStorage; private IndexerSQLMetadataStorageCoordinator storageCoordinator; private MetadataSegmentManager segmentManager; @@ -112,6 +115,7 @@ public abstract class IngestionTestBase derbyConnectorRule.getConnector() ); lockbox = new TaskLockbox(taskStorage, storageCoordinator); + segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()); } @After @@ -136,6 +140,11 @@ public abstract class IngestionTestBase lockbox.remove(task); } + public SegmentLoader newSegmentLoader(File storageDir) + { + return segmentLoaderFactory.manufacturate(storageDir); + } + public ObjectMapper getObjectMapper() { return objectMapper; @@ -146,6 +155,11 @@ public abstract class IngestionTestBase return taskStorage; } + public SegmentLoaderFactory getSegmentLoaderFactory() + { + return segmentLoaderFactory; + } + public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator() { return storageCoordinator; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index a8e282ed58d..974cfb3687d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -99,10 +99,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase 0 ); - TestLocalTaskActionClient actionClient; - LocalIndexingServiceClient indexingServiceClient; - TaskToolbox toolbox; - File localDeepStorage; + protected TestLocalTaskActionClient actionClient; + protected LocalIndexingServiceClient indexingServiceClient; + protected TaskToolbox toolbox; + protected File localDeepStorage; @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -128,7 +128,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase ); } - class LocalIndexingServiceClient extends NoopIndexingServiceClient + public class LocalIndexingServiceClient extends NoopIndexingServiceClient { private final ConcurrentMap> tasks = new ConcurrentHashMap<>(); private final ListeningExecutorService service = MoreExecutors.listeningDecorator( @@ -246,13 +246,13 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase } } - void shutdown() + public void shutdown() { service.shutdownNow(); } } - TaskToolbox createTaskToolbox(Task task) throws IOException + protected TaskToolbox createTaskToolbox(Task task) throws IOException { return new TaskToolbox( null, @@ -278,7 +278,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase null, null, null, - null, + newSegmentLoader(temporaryFolder.newFolder()), getObjectMapper(), temporaryFolder.newFolder(task.getId()), getIndexIO(), @@ -307,6 +307,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase { super( id, + null, taskResource, ingestionSchema, context, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java index eb984c879ac..6501479e0f5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java @@ -192,6 +192,7 @@ public class MultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervi null, null, null, + null, partitionsSpec, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index ee33a7ad9c5..463ef4db5e6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Iterables; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -161,7 +162,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu ) { final TestFirehoseFactory firehoseFactory = (TestFirehoseFactory) ioConfig.getFirehoseFactory(); - final int numTotalSubTasks = firehoseFactory.getNumSplits(); + final int numTotalSubTasks = firehoseFactory.getNumSplits(null); // set up ingestion spec final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec( new DataSchema( @@ -201,6 +202,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu null, null, null, + null, numTotalSubTasks, null, null, @@ -256,13 +258,13 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu } @Override - public Stream> getSplits() + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) { return splits.stream(); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { return splits.size(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 94f67781f80..d55ce70a61b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -25,6 +25,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -432,6 +433,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd null, null, null, + null, NUM_SUB_TASKS, null, null, @@ -465,13 +467,13 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd } @Override - public Stream> getSplits() + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) { return ids.stream().map(InputSplit::new); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { return ids.size(); } @@ -596,7 +598,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd new LocalParallelIndexTaskClientFactory(supervisorTask) ); final TestFirehose firehose = (TestFirehose) getIngestionSpec().getIOConfig().getFirehoseFactory(); - final InputSplit split = firehose.getSplits().findFirst().orElse(null); + final InputSplit split = firehose.getSplits(null).findFirst().orElse(null); if (split == null) { throw new ISE("Split is null"); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index a5c1e97ba41..c8857aaf105 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -156,6 +156,7 @@ public class ParallelIndexSupervisorTaskSerdeTest { return new ParallelIndexSupervisorTask( ID, + null, taskResource, ingestionSpec, context, @@ -248,6 +249,7 @@ public class ParallelIndexSupervisorTaskSerdeTest null, null, null, + null, partitionsSpec, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index b8d081a2878..f9577df97b6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -71,6 +71,7 @@ public class ParallelIndexTuningConfigTest 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -87,7 +88,7 @@ public class ParallelIndexTuningConfigTest null, 250, 100, - 20, + 20L, new Duration(3600), 128, null, @@ -112,6 +113,7 @@ public class ParallelIndexTuningConfigTest 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -128,7 +130,7 @@ public class ParallelIndexTuningConfigTest null, maxNumConcurrentSubTasks, 100, - 20, + 20L, new Duration(3600), 128, null, @@ -153,6 +155,7 @@ public class ParallelIndexTuningConfigTest 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -169,7 +172,7 @@ public class ParallelIndexTuningConfigTest maxNumSubTasks, null, 100, - 20, + 20L, new Duration(3600), 128, null, @@ -196,6 +199,7 @@ public class ParallelIndexTuningConfigTest 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -212,7 +216,7 @@ public class ParallelIndexTuningConfigTest maxNumSubTasks, maxNumSubTasks, 100, - 20, + 20L, new Duration(3600), 128, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 24f19cd3503..0c17b8c77ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -301,6 +301,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv null, null, null, + null, 1, null, null, @@ -373,6 +374,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv null, null, null, + null, 2, null, null, @@ -431,9 +433,9 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv ); } - private static class TestSupervisorTask extends TestParallelIndexSupervisorTask + public static class TestSupervisorTask extends TestParallelIndexSupervisorTask { - TestSupervisorTask( + public TestSupervisorTask( String id, TaskResource taskResource, ParallelIndexIngestionSpec ingestionSchema, @@ -451,7 +453,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv } } - private static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTaskRunner + public static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTaskRunner { private final ParallelIndexSupervisorTask supervisorTask; @@ -496,7 +498,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv } } - private static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec + public static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec { private final ParallelIndexSupervisorTask supervisorTask; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 4944d276522..ec250c4217d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -163,11 +163,11 @@ public class IngestSegmentFirehoseFactoryTimelineTest private void testSplit() throws Exception { Assert.assertTrue(factory.isSplittable()); - final int numSplits = factory.getNumSplits(); + final int numSplits = factory.getNumSplits(null); // We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte. Assert.assertEquals(segmentCount, numSplits); final List>> splits = - factory.getSplits().collect(Collectors.toList()); + factory.getSplits(null).collect(Collectors.toList()); Assert.assertEquals(numSplits, splits.size()); int count = 0; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java index cedacc8498d..c99a7cb0bf4 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java @@ -21,6 +21,7 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.segment.IndexSpec; import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig; @@ -38,28 +39,46 @@ public class ClientCompactQueryTuningConfig @Nullable private final Long maxTotalRows; @Nullable + private final SplitHintSpec splitHintSpec; + @Nullable private final IndexSpec indexSpec; @Nullable private final Integer maxPendingPersists; @Nullable private final Long pushTimeout; + @Nullable + private final Integer maxNumConcurrentSubTasks; public static ClientCompactQueryTuningConfig from( @Nullable UserCompactTuningConfig userCompactionTaskQueryTuningConfig, @Nullable Integer maxRowsPerSegment ) { - return new ClientCompactQueryTuningConfig( - maxRowsPerSegment, - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxTotalRows(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getIndexSpec(), - userCompactionTaskQueryTuningConfig == null - ? null - : userCompactionTaskQueryTuningConfig.getMaxPendingPersists(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getPushTimeout() - ); + if (userCompactionTaskQueryTuningConfig == null) { + return new ClientCompactQueryTuningConfig( + maxRowsPerSegment, + null, + null, + null, + null, + null, + null, + null, + null + ); + } else { + return new ClientCompactQueryTuningConfig( + maxRowsPerSegment, + userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(), + userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(), + userCompactionTaskQueryTuningConfig.getMaxTotalRows(), + userCompactionTaskQueryTuningConfig.getSplitHintSpec(), + userCompactionTaskQueryTuningConfig.getIndexSpec(), + userCompactionTaskQueryTuningConfig.getMaxPendingPersists(), + userCompactionTaskQueryTuningConfig.getPushTimeout(), + userCompactionTaskQueryTuningConfig.getMaxNumConcurrentSubTasks() + ); + } } @JsonCreator @@ -68,24 +87,28 @@ public class ClientCompactQueryTuningConfig @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - @JsonProperty("pushTimeout") @Nullable Long pushTimeout + @JsonProperty("pushTimeout") @Nullable Long pushTimeout, + @JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks ) { this.maxRowsPerSegment = maxRowsPerSegment; this.maxBytesInMemory = maxBytesInMemory; this.maxRowsInMemory = maxRowsInMemory; this.maxTotalRows = maxTotalRows; + this.splitHintSpec = splitHintSpec; this.indexSpec = indexSpec; this.maxPendingPersists = maxPendingPersists; this.pushTimeout = pushTimeout; + this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks; } @JsonProperty public String getType() { - return "index"; + return "index_parallel"; } @JsonProperty @@ -116,6 +139,13 @@ public class ClientCompactQueryTuningConfig return maxTotalRows; } + @Nullable + @JsonProperty + public SplitHintSpec getSplitHintSpec() + { + return splitHintSpec; + } + public long getMaxTotalRowsOr(long defaultMaxTotalRows) { return maxTotalRows == null ? defaultMaxTotalRows : maxTotalRows; @@ -142,6 +172,13 @@ public class ClientCompactQueryTuningConfig return pushTimeout; } + @JsonProperty + @Nullable + public Integer getMaxNumConcurrentSubTasks() + { + return maxNumConcurrentSubTasks; + } + @Override public boolean equals(Object o) { @@ -158,7 +195,8 @@ public class ClientCompactQueryTuningConfig Objects.equals(maxTotalRows, that.maxTotalRows) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(maxPendingPersists, that.maxPendingPersists) && - Objects.equals(pushTimeout, that.pushTimeout); + Objects.equals(pushTimeout, that.pushTimeout) && + Objects.equals(maxNumConcurrentSubTasks, that.maxNumConcurrentSubTasks); } @Override @@ -171,7 +209,8 @@ public class ClientCompactQueryTuningConfig maxTotalRows, indexSpec, maxPendingPersists, - pushTimeout + pushTimeout, + maxNumConcurrentSubTasks ); } @@ -186,6 +225,7 @@ public class ClientCompactQueryTuningConfig ", indexSpec=" + indexSpec + ", maxPendingPersists=" + maxPendingPersists + ", pushTimeout=" + pushTimeout + + ", maxNumConcurrentSubTasks=" + maxNumConcurrentSubTasks + '}'; } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java index dec2ab045bb..31cb38c8a58 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import javax.annotation.Nullable; @@ -84,13 +85,13 @@ public class InlineFirehoseFactory implements FiniteFirehoseFactory> getSplits() + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) { return Stream.of(new InputSplit<>(data)); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { return 1; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index da5cde019a3..c59de7f1102 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.segment.IndexSpec; import org.joda.time.Period; @@ -158,9 +159,11 @@ public class DataSourceCompactionConfig @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - @JsonProperty("pushTimeout") @Nullable Long pushTimeout + @JsonProperty("pushTimeout") @Nullable Long pushTimeout, + @JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks ) { super( @@ -168,9 +171,11 @@ public class DataSourceCompactionConfig maxRowsInMemory, maxBytesInMemory, maxTotalRows, + splitHintSpec, indexSpec, maxPendingPersists, - pushTimeout + pushTimeout, + maxNumConcurrentSubTasks ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index e6a6705385b..ae9a9ac88fb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -92,6 +92,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper final List compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks()); // dataSource -> list of intervals of compact tasks final Map> compactTaskIntervals = new HashMap<>(compactionConfigList.size()); + int numEstimatedNonCompleteCompactionTasks = 0; for (TaskStatusPlus status : compactTasks) { final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId()); if (response == null) { @@ -101,6 +102,8 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload(); final Interval interval = compactQuery.getIoConfig().getInputSpec().getInterval(); compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval); + final int numSubTasks = findNumMaxConcurrentSubTasks(compactQuery.getTuningConfig()); + numEstimatedNonCompleteCompactionTasks += numSubTasks + 1; // count the compaction task itself } else { throw new ISE("WTH? task[%s] is not a compactTask?", status.getId()); } @@ -112,13 +115,19 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), dynamicConfig.getMaxCompactionTaskSlots() ); - final int numNonCompleteCompactionTasks = compactTasks.size(); - final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 - ? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks) - // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. - // This guarantees that at least one slot is available if - // compaction is enabled and numRunningCompactTasks is 0. - : Math.max(1, compactionTaskCapacity); + final int numAvailableCompactionTaskSlots; + if (numEstimatedNonCompleteCompactionTasks > 0) { + numAvailableCompactionTaskSlots = Math.max( + 0, + compactionTaskCapacity - numEstimatedNonCompleteCompactionTasks + ); + } else { + // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. + // This guarantees that at least one slot is available if + // compaction is enabled and numRunningCompactTasks is 0. + numAvailableCompactionTaskSlots = Math.max(1, compactionTaskCapacity); + } + LOG.info( "Found [%d] available task slots for compaction out of [%d] max compaction task capacity", numAvailableCompactionTaskSlots, @@ -141,6 +150,25 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper .build(); } + /** + * Each compaction task can run a parallel indexing task. When we count the number of current running + * compaction tasks, we should count the sub tasks of the parallel indexing task as well. However, we currently + * don't have a good way to get the number of current running sub tasks except poking each supervisor task, + * which is complex to handle all kinds of failures. Here, we simply return {@code maxNumConcurrentSubTasks} instead + * to estimate the number of sub tasks conservatively. This should be ok since it won't affect to the performance of + * other ingestion types. + */ + private int findNumMaxConcurrentSubTasks(@Nullable ClientCompactQueryTuningConfig tuningConfig) + { + if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) { + // The actual number of subtasks might be smaller than the configured max. + // However, we use the max to simplify the estimation here. + return tuningConfig.getMaxNumConcurrentSubTasks(); + } else { + return 0; + } + } + private static List filterNonCompactTasks(List taskStatuses) { return taskStatuses @@ -164,7 +192,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper { int numSubmittedTasks = 0; - for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) { + for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots;) { final List segmentsToCompact = iterator.next(); if (!segmentsToCompact.isEmpty()) { @@ -182,6 +210,8 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper taskId, Iterables.transform(segmentsToCompact, DataSegment::getId) ); + // Count the compaction task itself + its sub tasks + numSubmittedTasks += findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1; } else { throw new ISE("segmentsToCompact is empty?"); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java index 6bc9a0e1213..58671e82a52 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java @@ -85,7 +85,7 @@ public class InlineFirehoseFactoryTest { Assert.assertTrue(target instanceof FiniteFirehoseFactory); Assert.assertFalse(target.isSplittable()); - Assert.assertEquals(1, target.getNumSplits()); + Assert.assertEquals(1, target.getNumSplits(null)); } @Test(expected = NullPointerException.class) @@ -115,7 +115,7 @@ public class InlineFirehoseFactoryTest @Test public void testForcedSplitAndClone() { - Optional> inputSplitOptional = target.getSplits().findFirst(); + Optional> inputSplitOptional = target.getSplits(null).findFirst(); Assert.assertTrue(inputSplitOptional.isPresent()); FiniteFirehoseFactory cloneWithSplit = target.withSplit(inputSplitOptional.get()); Assert.assertTrue(cloneWithSplit instanceof InlineFirehoseFactory); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 5d5b9df3a09..7b9e3b2698b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; @@ -93,7 +94,7 @@ public class DataSourceCompactionConfigTest @Test public void testSerdeUserCompactTuningConfig() throws IOException { - final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null, null); + final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null, null, null, null); final String json = OBJECT_MAPPER.writeValueAsString(config); // Check maxRowsPerSegment doesn't exist in the JSON string Assert.assertFalse(json.contains("maxRowsPerSegment")); @@ -116,6 +117,8 @@ public class DataSourceCompactionConfigTest 10000L, null, null, + null, + null, null ), ImmutableMap.of("key", "val") @@ -147,6 +150,8 @@ public class DataSourceCompactionConfigTest 10000L, null, null, + null, + null, null ), ImmutableMap.of("key", "val") @@ -171,6 +176,7 @@ public class DataSourceCompactionConfigTest 1000, 10000L, 2000L, + new SegmentsSplitHintSpec(10000L), new IndexSpec( new RoaringBitmapSerdeFactory(false), CompressionStrategy.LZF, @@ -178,7 +184,8 @@ public class DataSourceCompactionConfigTest LongEncodingStrategy.LONGS ), 1, - 3000L + 3000L, + null ); final String json = OBJECT_MAPPER.writeValueAsString(tuningConfig); diff --git a/website/.spelling b/website/.spelling index 0ed9a264902..f1fdcdb6fdb 100644 --- a/website/.spelling +++ b/website/.spelling @@ -918,6 +918,8 @@ InlineFirehose LocalFirehose PartitionsSpec PasswordProviders +SegmentsSplitHintSpec +SplitHintSpec appendToExisting baseDir chatHandlerNumRetries @@ -938,6 +940,7 @@ reportParseExceptions segmentWriteOutMediumFactory sql sqls +splitHintSpec taskStatusCheckPeriodMs timeChunk totalNumMergeTasks