Auto compaction based on parallel indexing (#8570)

* Auto compaction based on parallel indexing

* javadoc and doc

* typo

* update spell

* addressing comments

* address comments

* fix log

* fix build

* fix test

* increase default max input segment bytes per task

* fix test
This commit is contained in:
Jihoon Son 2019-10-18 13:24:14 -07:00 committed by GitHub
parent 1ca859584f
commit 30c15900be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1010 additions and 236 deletions

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.druid.data.input.impl.InputRowParser;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.stream.Stream;
@ -52,13 +53,13 @@ public interface FiniteFirehoseFactory<T extends InputRowParser, S> extends Fire
* lazily so that the listing overhead could be amortized.
*/
@JsonIgnore
Stream<InputSplit<S>> getSplits() throws IOException;
Stream<InputSplit<S>> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException;
/**
* Returns number of splits returned by {@link #getSplits()}.
* Returns number of splits returned by {@link #getSplits}.
*/
@JsonIgnore
int getNumSplits() throws IOException;
int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException;
/**
* Returns the same {@link FiniteFirehoseFactory} but with the given {@link InputSplit}. The returned

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* {@link SplitHintSpec} for IngestSegmentFirehoseFactory.
*/
public class SegmentsSplitHintSpec implements SplitHintSpec
{
public static final String TYPE = "segments";
private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 500 * 1024 * 1024;
/**
* Maximum number of bytes of input segments to process in a single task.
* If a single segment is larger than this number, it will be processed by itself in a single task.
*/
private final long maxInputSegmentBytesPerTask;
@JsonCreator
public SegmentsSplitHintSpec(
@JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask
)
{
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null
? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK
: maxInputSegmentBytesPerTask;
}
@JsonProperty
public long getMaxInputSegmentBytesPerTask()
{
return maxInputSegmentBytesPerTask;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentsSplitHintSpec that = (SegmentsSplitHintSpec) o;
return maxInputSegmentBytesPerTask == that.maxInputSegmentBytesPerTask;
}
@Override
public int hashCode()
{
return Objects.hash(maxInputSegmentBytesPerTask);
}
@Override
public String toString()
{
return "SegmentsSplitHintSpec{" +
"maxInputSegmentBytesPerTask=" + maxInputSegmentBytesPerTask +
'}';
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* In native parallel indexing, the supervisor task partitions input data into splits and assigns each of them
* to a single sub task. How to create splits could mainly depend on the input file format, but sometimes druid users
* want to give some hints to control the amount of data each sub task will read. SplitHintSpec can be used for this
* purpose. Implementations can ignore the given hint.
*
* @see FiniteFirehoseFactory#getSplits(SplitHintSpec)
* @see FiniteFirehoseFactory#getNumSplits(SplitHintSpec)
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = SegmentsSplitHintSpec.TYPE, value = SegmentsSplitHintSpec.class)
})
public interface SplitHintSpec
{
}

View File

@ -26,8 +26,10 @@ import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -102,14 +104,14 @@ public abstract class AbstractTextFilesFirehoseFactory<T>
}
@Override
public Stream<InputSplit<T>> getSplits() throws IOException
public Stream<InputSplit<T>> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException
{
initializeObjectsIfNeeded();
return getObjects().stream().map(InputSplit::new);
}
@Override
public int getNumSplits() throws IOException
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException
{
initializeObjectsIfNeeded();
return getObjects().size();

View File

@ -813,9 +813,11 @@ If you see this problem, it's recommended to set `skipOffsetFromLatest` to some
|`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1000000)|
|`maxBytesInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (1/6 of max JVM memory)|
|`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 20000000)|
|`splitHintSpec`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = null)|
|`indexSpec`|See [IndexSpec](../ingestion/index.md#indexspec)|no|
|`maxPendingPersists`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0 (meaning one persist can be running concurrently with ingestion, and none can be queued up))|
|`pushTimeout`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0)|
|`maxNumConcurrentSubTasks`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1)|
### Overlord

View File

@ -100,9 +100,10 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"id": <task_id>,
"dataSource": <task_datasource>,
"ioConfig": <IO config>,
"dimensions" <custom dimensionsSpec>,
"dimensionsSpec" <custom dimensionsSpec>,
"metricsSpec" <custom metricsSpec>,
"segmentGranularity": <segment granularity after compaction>,
"tuningConfig" <index task tuningConfig>,
"tuningConfig" <parallel indexing task tuningConfig>,
"context": <task context>
}
```
@ -116,7 +117,7 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No|
|`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No|
|`tuningConfig`|[Index task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No|
|`tuningConfig`|[Parallel indexing task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No|
|`context`|[Task context](../ingestion/tasks.md#context)|No|

View File

@ -204,6 +204,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of firehose. See [SplitHintSpec](#splithintspec) for more details.|null|no|
|partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` if `forceGuaranteedRollup` = true|no|
|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no|
|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no|
@ -212,7 +213,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
|maxNumConcurrentSubTasks|Maximum number of tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no|
|maxNumConcurrentSubTasks|Maximum number of sub tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no|
|maxRetry|Maximum number of retries on task failures.|3|no|
|maxNumSegmentsToMerge|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only `forceGuaranteedRollup` is set.|100|no|
|totalNumMergeTasks|Total number of tasks to merge segments in the second phase when `forceGuaranteedRollup` is set.|10|no|
@ -220,6 +221,22 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
|chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no|
### `splitHintSpec`
`SplitHintSpec` is used to give a hint when the supervisor task creates input splits.
Note that each sub task processes a single input split. You can control the amount of data each sub task will read during the first phase.
Currently only one splitHintSpec, i.e., `segments`, is available.
#### `SegmentsSplitHintSpec`
`SegmentsSplitHintSpec` is used only for `IngestSegmentFirehose`.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `segments`.|none|yes|
|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks).|150MB|no|
### `partitionsSpec`
PartitionsSpec is to describe the secondary partitioning method.
@ -785,7 +802,7 @@ This firehose will accept any type of parser, but will only utilize the list of
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.md)|no|
|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
<a name="sql-firehose"></a>

View File

@ -115,7 +115,7 @@ public class StaticS3FirehoseFactoryTest
5
);
final List<FiniteFirehoseFactory<StringInputRowParser, URI>> subFactories = factory
.getSplits()
.getSplits(null)
.map(factory::withSplit)
.sorted(Comparator.comparing(eachFactory -> {
final StaticS3FirehoseFactory staticS3FirehoseFactory = (StaticS3FirehoseFactory) eachFactory;

View File

@ -31,6 +31,7 @@ import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -52,9 +53,10 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
@ -129,7 +131,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final IndexTuningConfig tuningConfig;
private final ParallelIndexTuningConfig tuningConfig;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final SegmentProvider segmentProvider;
@ -148,6 +150,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonIgnore
private final CoordinatorClient coordinatorClient;
private final IndexingServiceClient indexingServiceClient;
@JsonIgnore
private final SegmentLoaderFactory segmentLoaderFactory;
@ -160,14 +164,11 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonIgnore
private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder(
(taskObject, config) -> {
final IndexTask indexTask = (IndexTask) taskObject;
final ParallelIndexSupervisorTask indexTask = (ParallelIndexSupervisorTask) taskObject;
indexTask.stopGracefully(config);
}
);
@JsonIgnore
private List<IndexTask> indexTaskSpecs;
@JsonCreator
public CompactionTask(
@JsonProperty("id") final String id,
@ -180,13 +181,14 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
@JsonProperty("tuningConfig") @Nullable final IndexTuningConfig tuningConfig,
@JsonProperty("tuningConfig") @Nullable final ParallelIndexTuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject @Nullable IndexingServiceClient indexingServiceClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@JacksonInject RetryPolicyFactory retryPolicyFactory,
@JacksonInject AppenderatorsManager appenderatorsManager
@ -222,6 +224,7 @@ public class CompactionTask extends AbstractBatchIndexTask
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.indexingServiceClient = indexingServiceClient;
this.coordinatorClient = coordinatorClient;
this.segmentLoaderFactory = segmentLoaderFactory;
this.retryPolicyFactory = retryPolicyFactory;
@ -258,7 +261,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
@JsonProperty
public IndexTuningConfig getTuningConfig()
public ParallelIndexTuningConfig getTuningConfig()
{
return tuningConfig;
}
@ -304,36 +307,36 @@ public class CompactionTask extends AbstractBatchIndexTask
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
if (indexTaskSpecs == null) {
final List<IndexIngestionSpec> ingestionSpecs = createIngestionSchema(
toolbox,
segmentProvider,
partitionConfigurationManager,
dimensionsSpec,
metricsSpec,
segmentGranularity,
jsonMapper,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
);
indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
.mapToObj(i -> new IndexTask(
createIndexTaskSpecId(i),
getGroupId(),
getTaskResource(),
getDataSource(),
ingestionSpecs.get(i),
createContextForSubtask(),
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory,
appenderatorsManager
))
.collect(Collectors.toList());
}
final List<ParallelIndexIngestionSpec> ingestionSpecs = createIngestionSchema(
toolbox,
segmentProvider,
partitionConfigurationManager,
dimensionsSpec,
metricsSpec,
segmentGranularity,
jsonMapper,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
);
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
.mapToObj(i -> {
// taskId is used for different purposes in parallel indexing and local indexing.
// In parallel indexing, it's the taskId of the supervisor task. This supervisor taskId must be
// a valid taskId to communicate with sub tasks properly. We use the ID of the compaction task in this case.
//
// In local indexing, it's used as the sequence name for Appenderator. Even though a compaction task can run
// multiple index tasks (one per interval), the appenderator is not shared by those tasks. Each task creates
// a new Appenderator on its own instead. As a result, they should use different sequence names to allocate
// new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details.
// In this case, we use different fake IDs for each created index task.
final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1
? createIndexTaskSpecId(i)
: getId();
return newTask(subtaskId, ingestionSpecs.get(i));
})
.collect(Collectors.toList());
if (indexTaskSpecs.isEmpty()) {
log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec());
@ -344,7 +347,7 @@ public class CompactionTask extends AbstractBatchIndexTask
log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0;
for (IndexTask eachSpec : indexTaskSpecs) {
for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
if (!currentSubTaskHolder.setTask(eachSpec)) {
log.info("Task is asked to stop. Finish as failed.");
@ -374,10 +377,30 @@ public class CompactionTask extends AbstractBatchIndexTask
}
}
private Map<String, Object> createContextForSubtask()
@VisibleForTesting
ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec)
{
return new ParallelIndexSupervisorTask(
taskId,
getGroupId(),
getTaskResource(),
ingestionSpec,
createContextForSubtask(),
indexingServiceClient,
chatHandlerProvider,
authorizerMapper,
rowIngestionMetersFactory,
appenderatorsManager
);
}
@VisibleForTesting
Map<String, Object> createContextForSubtask()
{
final Map<String, Object> newContext = new HashMap<>(getContext());
newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId());
// Set the priority of the compaction task.
newContext.put(Tasks.PRIORITY_KEY, getPriority());
return newContext;
}
@ -387,12 +410,12 @@ public class CompactionTask extends AbstractBatchIndexTask
}
/**
* Generate {@link IndexIngestionSpec} from input segments.
* Generate {@link ParallelIndexIngestionSpec} from input segments.
*
* @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@VisibleForTesting
static List<IndexIngestionSpec> createIngestionSchema(
static List<ParallelIndexIngestionSpec> createIngestionSchema(
final TaskToolbox toolbox,
final SegmentProvider segmentProvider,
final PartitionConfigurationManager partitionConfigurationManager,
@ -424,7 +447,7 @@ public class CompactionTask extends AbstractBatchIndexTask
toolbox.getIndexIO()
);
final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
final ParallelIndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
if (segmentGranularity == null) {
// original granularity
@ -437,7 +460,7 @@ public class CompactionTask extends AbstractBatchIndexTask
.add(p)
);
final List<IndexIngestionSpec> specs = new ArrayList<>(intervalToSegments.size());
final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegments.size());
for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.getValue();
@ -451,7 +474,7 @@ public class CompactionTask extends AbstractBatchIndexTask
);
specs.add(
new IndexIngestionSpec(
new ParallelIndexIngestionSpec(
dataSchema,
createIoConfig(
toolbox,
@ -479,7 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask
);
return Collections.singletonList(
new IndexIngestionSpec(
new ParallelIndexIngestionSpec(
dataSchema,
createIoConfig(
toolbox,
@ -495,7 +518,7 @@ public class CompactionTask extends AbstractBatchIndexTask
}
}
private static IndexIOConfig createIoConfig(
private static ParallelIndexIOConfig createIoConfig(
TaskToolbox toolbox,
DataSchema dataSchema,
Interval interval,
@ -504,7 +527,7 @@ public class CompactionTask extends AbstractBatchIndexTask
RetryPolicyFactory retryPolicyFactory
)
{
return new IndexIOConfig(
return new ParallelIndexIOConfig(
new IngestSegmentFirehoseFactory(
dataSchema.getDataSource(),
interval,
@ -787,18 +810,18 @@ public class CompactionTask extends AbstractBatchIndexTask
static class PartitionConfigurationManager
{
@Nullable
private final IndexTuningConfig tuningConfig;
private final ParallelIndexTuningConfig tuningConfig;
PartitionConfigurationManager(@Nullable IndexTuningConfig tuningConfig)
PartitionConfigurationManager(@Nullable ParallelIndexTuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
}
@Nullable
IndexTuningConfig computeTuningConfig()
ParallelIndexTuningConfig computeTuningConfig()
{
IndexTuningConfig newTuningConfig = tuningConfig == null
? IndexTuningConfig.createDefault()
ParallelIndexTuningConfig newTuningConfig = tuningConfig == null
? ParallelIndexTuningConfig.defaultConfig()
: tuningConfig;
PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec();
if (partitionsSpec instanceof DynamicPartitionsSpec) {
@ -822,6 +845,7 @@ public class CompactionTask extends AbstractBatchIndexTask
private final AuthorizerMapper authorizerMapper;
private final ChatHandlerProvider chatHandlerProvider;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final IndexingServiceClient indexingServiceClient;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
private final RetryPolicyFactory retryPolicyFactory;
@ -835,7 +859,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private Granularity segmentGranularity;
@Nullable
private IndexTuningConfig tuningConfig;
private ParallelIndexTuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;
@ -845,6 +869,7 @@ public class CompactionTask extends AbstractBatchIndexTask
AuthorizerMapper authorizerMapper,
ChatHandlerProvider chatHandlerProvider,
RowIngestionMetersFactory rowIngestionMetersFactory,
IndexingServiceClient indexingServiceClient,
CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory,
@ -856,6 +881,7 @@ public class CompactionTask extends AbstractBatchIndexTask
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.indexingServiceClient = indexingServiceClient;
this.coordinatorClient = coordinatorClient;
this.segmentLoaderFactory = segmentLoaderFactory;
this.retryPolicyFactory = retryPolicyFactory;
@ -896,7 +922,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return this;
}
public Builder tuningConfig(IndexTuningConfig tuningConfig)
public Builder tuningConfig(ParallelIndexTuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
return this;
@ -928,6 +954,7 @@ public class CompactionTask extends AbstractBatchIndexTask
chatHandlerProvider,
rowIngestionMetersFactory,
coordinatorClient,
indexingServiceClient,
segmentLoaderFactory,
retryPolicyFactory,
appenderatorsManager

View File

@ -28,6 +28,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
@ -196,7 +197,10 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
(SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
LOG.error(
"Failed to run sub tasks for inputSplits[%s]",
getSplitsIfSplittable(spec.getIngestionSpec().getIOConfig().getFirehoseFactory())
getSplitsIfSplittable(
spec.getIngestionSpec().getIOConfig().getFirehoseFactory(),
tuningConfig.getSplitHintSpec()
)
);
}
break;
@ -250,11 +254,14 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
);
}
private static List<InputSplit> getSplitsIfSplittable(FirehoseFactory firehoseFactory) throws IOException
private static List<InputSplit> getSplitsIfSplittable(
FirehoseFactory firehoseFactory,
@Nullable SplitHintSpec splitHintSpec
) throws IOException
{
if (firehoseFactory instanceof FiniteFirehoseFactory) {
final FiniteFirehoseFactory<?, ?> finiteFirehoseFactory = (FiniteFirehoseFactory) firehoseFactory;
return finiteFirehoseFactory.getSplits().collect(Collectors.toList());
return finiteFirehoseFactory.getSplits(splitHintSpec).collect(Collectors.toList());
} else {
throw new ISE("firehoseFactory[%s] is not splittable", firehoseFactory.getClass().getSimpleName());
}

View File

@ -158,6 +158,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@JsonCreator
public ParallelIndexSupervisorTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") @Nullable String groupId,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") Map<String, Object> context,
@ -170,7 +171,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
{
super(
getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
null,
groupId,
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
context
@ -396,10 +397,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
"firehoseFactory[%s] is not splittable. Running sequentially.",
baseFirehoseFactory.getClass().getSimpleName()
);
} else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() == 1) {
} else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) {
LOG.warn(
"maxNumConcurrentSubTasks is 1. Running sequentially. "
+ "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode."
"maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. "
+ "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel "
+ "ingestion mode.",
ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks()
);
} else {
throw new ISE("Unknown reason for sequentail mode. Failing this task.");

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.java.util.common.IAE;
@ -46,6 +47,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
private static final int DEFAULT_MAX_NUM_SEGMENTS_TO_MERGE = 100;
private static final int DEFAULT_TOTAL_NUM_MERGE_TASKS = 10;
private final SplitHintSpec splitHintSpec;
private final int maxNumConcurrentSubTasks;
private final int maxRetry;
private final long taskStatusCheckPeriodMs;
@ -67,7 +70,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
*/
private final int totalNumMergeTasks;
static ParallelIndexTuningConfig defaultConfig()
public static ParallelIndexTuningConfig defaultConfig()
{
return new ParallelIndexTuningConfig(
null,
@ -94,6 +97,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
null,
null,
null,
null,
null
);
}
@ -106,6 +110,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@JsonProperty("numShards") @Deprecated @Nullable Integer numShards,
@JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
@JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@ -117,7 +122,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
@JsonProperty("maxNumSubTasks") @Deprecated @Nullable Integer maxNumSubTasks,
@JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks,
@JsonProperty("maxRetry") @Nullable Integer maxRetry,
@JsonProperty("taskStatusCheckPeriodMs") @Nullable Integer taskStatusCheckPeriodMs,
@JsonProperty("taskStatusCheckPeriodMs") @Nullable Long taskStatusCheckPeriodMs,
@JsonProperty("chatHandlerTimeout") @Nullable Duration chatHandlerTimeout,
@JsonProperty("chatHandlerNumRetries") @Nullable Integer chatHandlerNumRetries,
@JsonProperty("maxNumSegmentsToMerge") @Nullable Integer maxNumSegmentsToMerge,
@ -154,6 +159,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
throw new IAE("Can't use both maxNumSubTasks and maxNumConcurrentSubTasks. Use maxNumConcurrentSubTasks instead");
}
this.splitHintSpec = splitHintSpec;
if (maxNumConcurrentSubTasks == null) {
this.maxNumConcurrentSubTasks = maxNumSubTasks == null ? DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS : maxNumSubTasks;
} else {
@ -182,6 +189,13 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
Preconditions.checkArgument(this.totalNumMergeTasks > 0, "totalNumMergeTasks must be positive");
}
@Nullable
@JsonProperty
public SplitHintSpec getSplitHintSpec()
{
return splitHintSpec;
}
@JsonProperty
public int getMaxNumConcurrentSubTasks()
{
@ -224,6 +238,39 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
return totalNumMergeTasks;
}
@Override
public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new ParallelIndexTuningConfig(
null,
null,
getMaxRowsInMemory(),
getMaxBytesInMemory(),
null,
null,
getSplitHintSpec(),
partitionsSpec,
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
getMaxPendingPersists(),
isForceGuaranteedRollup(),
isReportParseExceptions(),
getPushTimeout(),
getSegmentWriteOutMediumFactory(),
null,
getMaxNumConcurrentSubTasks(),
getMaxRetry(),
getTaskStatusCheckPeriodMs(),
getChatHandlerTimeout(),
getChatHandlerNumRetries(),
getMaxNumSegmentsToMerge(),
getTotalNumMergeTasks(),
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions()
);
}
@Override
public boolean equals(Object o)
{
@ -243,6 +290,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
chatHandlerNumRetries == that.chatHandlerNumRetries &&
maxNumSegmentsToMerge == that.maxNumSegmentsToMerge &&
totalNumMergeTasks == that.totalNumMergeTasks &&
Objects.equals(splitHintSpec, that.splitHintSpec) &&
Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout);
}
@ -251,6 +299,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
{
return Objects.hash(
super.hashCode(),
splitHintSpec,
maxNumConcurrentSubTasks,
maxRetry,
taskStatusCheckPeriodMs,
@ -260,4 +309,19 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
totalNumMergeTasks
);
}
@Override
public String toString()
{
return "ParallelIndexTuningConfig{" +
"splitHintSpec=" + splitHintSpec +
", maxNumConcurrentSubTasks=" + maxNumConcurrentSubTasks +
", maxRetry=" + maxRetry +
", taskStatusCheckPeriodMs=" + taskStatusCheckPeriodMs +
", chatHandlerTimeout=" + chatHandlerTimeout +
", chatHandlerNumRetries=" + chatHandlerNumRetries +
", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge +
", totalNumMergeTasks=" + totalNumMergeTasks +
"} " + super.toString();
}
}

View File

@ -70,13 +70,13 @@ class PartialSegmentGenerateParallelIndexTaskRunner
@Override
Iterator<SubTaskSpec<PartialSegmentGenerateTask>> subTaskSpecIterator() throws IOException
{
return baseFirehoseFactory.getSplits().map(this::newTaskSpec).iterator();
return baseFirehoseFactory.getSplits(getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator();
}
@Override
int getTotalNumSubTasks() throws IOException
{
return baseFirehoseFactory.getNumSplits();
return baseFirehoseFactory.getNumSplits(getTuningConfig().getSplitHintSpec());
}
@VisibleForTesting

View File

@ -79,13 +79,13 @@ class SinglePhaseParallelIndexTaskRunner
@Override
Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator() throws IOException
{
return baseFirehoseFactory.getSplits().map(this::newTaskSpec).iterator();
return baseFirehoseFactory.getSplits(getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator();
}
@Override
int getTotalNumSubTasks() throws IOException
{
return baseFirehoseFactory.getNumSplits();
return baseFirehoseFactory.getNumSplits(getTuningConfig().getSplitHintSpec());
}
@VisibleForTesting

View File

@ -34,6 +34,8 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexing.common.RetryPolicy;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@ -78,7 +80,6 @@ import java.util.stream.Stream;
public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>>
{
private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024;
private final String dataSource;
// Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly
// by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel
@ -90,7 +91,8 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
private final DimFilter dimFilter;
private final List<String> dimensions;
private final List<String> metrics;
private final long maxInputSegmentBytesPerTask;
@Nullable
private final Long maxInputSegmentBytesPerTask;
private final IndexIO indexIO;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
@ -108,7 +110,7 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("maxInputSegmentBytesPerTask") Long maxInputSegmentBytesPerTask,
@JsonProperty("maxInputSegmentBytesPerTask") @Deprecated @Nullable Long maxInputSegmentBytesPerTask,
@JacksonInject IndexIO indexIO,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@ -125,9 +127,7 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
this.dimFilter = dimFilter;
this.dimensions = dimensions;
this.metrics = metrics;
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null
? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK
: maxInputSegmentBytesPerTask;
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask;
this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
@ -190,8 +190,9 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
return metrics;
}
@Nullable
@JsonProperty
public long getMaxInputSegmentBytesPerTask()
public Long getMaxInputSegmentBytesPerTask()
{
return maxInputSegmentBytesPerTask;
}
@ -388,12 +389,26 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
return new ArrayList<>(timeline.values());
}
private void initializeSplitsIfNeeded()
private void initializeSplitsIfNeeded(@Nullable SplitHintSpec splitHintSpec)
{
if (splits != null) {
return;
}
final SegmentsSplitHintSpec nonNullSplitHintSpec;
if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) {
if (splitHintSpec != null) {
log.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ingoring it.", splitHintSpec);
}
nonNullSplitHintSpec = new SegmentsSplitHintSpec(null);
} else {
nonNullSplitHintSpec = (SegmentsSplitHintSpec) splitHintSpec;
}
final long maxInputSegmentBytesPerTask = this.maxInputSegmentBytesPerTask == null
? nonNullSplitHintSpec.getMaxInputSegmentBytesPerTask()
: this.maxInputSegmentBytesPerTask;
// isSplittable() ensures this is only called when we have an interval.
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = getTimelineForInterval();
@ -456,16 +471,16 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
}
@Override
public Stream<InputSplit<List<WindowedSegmentId>>> getSplits()
public Stream<InputSplit<List<WindowedSegmentId>>> getSplits(@Nullable SplitHintSpec splitHintSpec)
{
initializeSplitsIfNeeded();
initializeSplitsIfNeeded(splitHintSpec);
return splits.stream();
}
@Override
public int getNumSplits()
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
{
initializeSplitsIfNeeded();
initializeSplitsIfNeeded(splitHintSpec);
return splits.size();
}

View File

@ -27,6 +27,9 @@ import org.apache.druid.client.indexing.ClientCompactQuery;
import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
@ -74,6 +77,7 @@ public class ClientCompactQuerySerdeTest
40000,
2000L,
30000L,
new SegmentsSplitHintSpec(100000L),
new IndexSpec(
new DefaultBitmapSerdeFactory(),
CompressionStrategy.LZ4,
@ -81,7 +85,8 @@ public class ClientCompactQuerySerdeTest
LongEncodingStrategy.LONGS
),
null,
1000L
1000L,
100
),
new HashMap<>()
);
@ -100,22 +105,36 @@ public class ClientCompactQuerySerdeTest
((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds()
);
Assert.assertEquals(
query.getTuningConfig().getMaxRowsInMemory().intValue(), task.getTuningConfig().getMaxRowsInMemory()
query.getTuningConfig().getMaxRowsInMemory().intValue(),
task.getTuningConfig().getMaxRowsInMemory()
);
Assert.assertEquals(
query.getTuningConfig().getMaxBytesInMemory().longValue(), task.getTuningConfig().getMaxBytesInMemory()
query.getTuningConfig().getMaxBytesInMemory().longValue(),
task.getTuningConfig().getMaxBytesInMemory()
);
Assert.assertEquals(
query.getTuningConfig().getMaxRowsPerSegment(), task.getTuningConfig().getMaxRowsPerSegment()
query.getTuningConfig().getMaxRowsPerSegment(),
task.getTuningConfig().getMaxRowsPerSegment()
);
Assert.assertEquals(
query.getTuningConfig().getMaxTotalRows(), task.getTuningConfig().getMaxTotalRows()
query.getTuningConfig().getMaxTotalRows(),
task.getTuningConfig().getMaxTotalRows()
);
Assert.assertEquals(
query.getTuningConfig().getIndexSpec(), task.getTuningConfig().getIndexSpec()
query.getTuningConfig().getSplitHintSpec(),
task.getTuningConfig().getSplitHintSpec()
);
Assert.assertEquals(
query.getTuningConfig().getPushTimeout().longValue(), task.getTuningConfig().getPushTimeout()
query.getTuningConfig().getIndexSpec(),
task.getTuningConfig().getIndexSpec()
);
Assert.assertEquals(
query.getTuningConfig().getPushTimeout().longValue(),
task.getTuningConfig().getPushTimeout()
);
Assert.assertEquals(
query.getTuningConfig().getMaxNumConcurrentSubTasks().intValue(),
task.getTuningConfig().getMaxNumConcurrentSubTasks()
);
Assert.assertEquals(query.getContext(), task.getContext());
}
@ -143,6 +162,7 @@ public class ClientCompactQuerySerdeTest
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
binder.bind(IndexingServiceClient.class).toInstance(new NoopIndexingServiceClient());
}
)
)

View File

@ -0,0 +1,316 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingTest.TestSupervisorTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervisorTaskTest
{
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK},
new Object[]{LockGranularity.SEGMENT}
);
}
private static final String DATA_SOURCE = "test";
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private final AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager();
private final LockGranularity lockGranularity;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final CoordinatorClient coordinatorClient;
public CompactionTaskParallelRunTest(LockGranularity lockGranularity)
{
this.lockGranularity = lockGranularity;
this.rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
coordinatorClient = new CoordinatorClient(null, null)
{
@Override
public List<DataSegment> getDatabaseSegmentDataSourceSegments(String dataSource, List<Interval> intervals)
{
return getStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals);
}
@Override
public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId)
{
ImmutableDruidDataSource druidDataSource = getMetadataSegmentManager().getImmutableDataSourceWithUsedSegments(
dataSource
);
if (druidDataSource == null) {
throw new ISE("Unknown datasource[%s]", dataSource);
}
for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource(dataSource, segmentId)) {
DataSegment segment = druidDataSource.getSegment(possibleSegmentId);
if (segment != null) {
return segment;
}
}
throw new ISE("Can't find segment for id[%s]", segmentId);
}
};
}
@Before
public void setup() throws IOException
{
indexingServiceClient = new LocalIndexingServiceClient();
localDeepStorage = temporaryFolder.newFolder();
}
@After
public void teardown()
{
indexingServiceClient.shutdown();
temporaryFolder.delete();
}
@Test
public void testRunParallel() throws Exception
{
runIndexTask();
final CompactionTask compactionTask = new TestCompactionTask(
null,
null,
DATA_SOURCE,
new CompactionIOConfig(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null)),
null,
null,
null,
null,
newTuningConfig(),
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
coordinatorClient,
indexingServiceClient,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY,
appenderatorsManager
);
runTask(compactionTask);
}
private void runIndexTask() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
writer.write("2014-01-01T01:00:20Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,2\n");
writer.write("2014-01-01T01:00:20Z,c,3\n");
writer.write("2014-01-01T02:00:30Z,a,1\n");
writer.write("2014-01-01T02:00:30Z,b,2\n");
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
IndexTask indexTask = new IndexTask(
null,
null,
IndexTaskTest.createIngestionSpec(
getObjectMapper(),
tmpDir,
CompactionTaskRunTest.DEFAULT_PARSE_SPEC,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, null, false, true),
false
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
appenderatorsManager
);
runTask(indexTask);
}
private void runTask(Task task) throws Exception
{
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertTrue(task.isReady(actionClient));
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
shutdownTask(task);
}
private static ParallelIndexTuningConfig newTuningConfig()
{
return new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
private static class TestCompactionTask extends CompactionTask
{
private final IndexingServiceClient indexingServiceClient;
TestCompactionTask(
String id,
TaskResource taskResource,
String dataSource,
@Nullable CompactionIOConfig ioConfig,
@Nullable DimensionsSpec dimensions,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nullable Granularity segmentGranularity,
@Nullable ParallelIndexTuningConfig tuningConfig,
@Nullable Map<String, Object> context,
ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper,
ChatHandlerProvider chatHandlerProvider,
RowIngestionMetersFactory rowIngestionMetersFactory,
CoordinatorClient coordinatorClient,
@Nullable IndexingServiceClient indexingServiceClient,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory,
AppenderatorsManager appenderatorsManager
)
{
super(
id,
taskResource,
dataSource,
null,
null,
ioConfig,
dimensions,
dimensionsSpec,
metricsSpec,
segmentGranularity,
tuningConfig,
context,
jsonMapper,
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory,
coordinatorClient,
indexingServiceClient,
segmentLoaderFactory,
retryPolicyFactory,
appenderatorsManager
);
this.indexingServiceClient = indexingServiceClient;
}
@Override
ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec)
{
return new TestSupervisorTask(
taskId,
null,
ingestionSpec,
createContextForSubtask(),
indexingServiceClient
);
}
}
}

View File

@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
@ -56,6 +58,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@ -76,6 +79,7 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@ -90,15 +94,13 @@ import java.util.concurrent.Future;
@RunWith(Parameterized.class)
public class CompactionTaskRunTest extends IngestionTestBase
{
public static final String DATA_SOURCE = "test";
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
public static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
@ -137,18 +139,23 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
}
private static final String DATA_SOURCE = "test";
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final IndexingServiceClient indexingServiceClient;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
private final LockGranularity lockGranularity;
private final AppenderatorsManager appenderatorsManager;
private ExecutorService exec;
private AppenderatorsManager appenderatorsManager;
private File localDeepStorage;
public CompactionTaskRunTest(LockGranularity lockGranularity)
{
TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
indexingServiceClient = new NoopIndexingServiceClient();
coordinatorClient = new CoordinatorClient(null, null)
{
@Override
@ -163,15 +170,17 @@ public class CompactionTaskRunTest extends IngestionTestBase
}
@Before
public void setup()
public void setup() throws IOException
{
exec = Execs.multiThreaded(2, "compaction-task-run-test-%d");
localDeepStorage = temporaryFolder.newFolder();
}
@After
public void teardown()
{
exec.shutdownNow();
temporaryFolder.delete();
}
@Test
@ -183,8 +192,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
indexingServiceClient,
coordinatorClient,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -228,8 +238,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
indexingServiceClient,
coordinatorClient,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -306,8 +317,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
indexingServiceClient,
coordinatorClient,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -350,7 +362,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
appenderatorsManager
);
@ -404,8 +416,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
indexingServiceClient,
coordinatorClient,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -459,8 +472,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
indexingServiceClient,
coordinatorClient,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -506,8 +520,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
indexingServiceClient,
coordinatorClient,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -564,8 +579,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
indexingServiceClient,
coordinatorClient,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -675,7 +691,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
appenderatorsManager
);
@ -696,31 +712,54 @@ public class CompactionTaskRunTest extends IngestionTestBase
{
getLockbox().add(task);
getTaskStorage().insert(task, TaskStatus.running(task.getId()));
final TestLocalTaskActionClient actionClient = createActionClient(task);
final File deepStorageDir = temporaryFolder.newFolder();
final ObjectMapper objectMapper = getObjectMapper();
objectMapper.registerSubtypes(
new NamedType(LocalLoadSpec.class, "local")
);
objectMapper.registerSubtypes(LocalDataSegmentPuller.class);
final TaskToolbox box = createTaskToolbox(objectMapper, task);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true);
if (task.isReady(box.getTaskActionClient())) {
if (readyLatchToCountDown != null) {
readyLatchToCountDown.countDown();
}
if (latchToAwaitBeforeRun != null) {
latchToAwaitBeforeRun.await();
}
TaskStatus status = task.run(box);
shutdownTask(task);
final List<DataSegment> segments = new ArrayList<>(
((TestLocalTaskActionClient) box.getTaskActionClient()).getPublishedSegments()
);
Collections.sort(segments);
return Pair.of(status, segments);
} else {
throw new ISE("task[%s] is not ready", task.getId());
}
}
private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException
{
final SegmentLoader loader = new SegmentLoaderLocalCacheManager(
getIndexIO(),
new SegmentLoaderConfig() {
@Override
public List<StorageLocationConfig> getLocations()
{
return ImmutableList.of(new StorageLocationConfig(deepStorageDir, null, null));
return ImmutableList.of(new StorageLocationConfig(localDeepStorage, null, null));
}
},
objectMapper
);
final TaskToolbox box = new TaskToolbox(
return new TaskToolbox(
null,
null,
actionClient,
createActionClient(task),
null,
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()),
new NoopDataSegmentKiller(),
@ -747,23 +786,5 @@ public class CompactionTaskRunTest extends IngestionTestBase
new NoopTestTaskReportFileWriter(),
null
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true);
if (task.isReady(box.getTaskActionClient())) {
if (readyLatchToCountDown != null) {
readyLatchToCountDown.countDown();
}
if (latchToAwaitBeforeRun != null) {
latchToAwaitBeforeRun.await();
}
TaskStatus status = task.run(box);
shutdownTask(task);
final List<DataSegment> segments = new ArrayList<>(actionClient.getPublishedSegments());
Collections.sort(segments);
return Pair.of(status, segments);
} else {
throw new ISE("task[%s] is not ready", task.getId());
}
}
}

View File

@ -29,6 +29,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -55,9 +57,9 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
@ -148,12 +150,13 @@ public class CompactionTaskTest
Intervals.of("2017-06-01/2017-07-01")
);
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = new HashMap<>();
private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
private static final ParallelIndexTuningConfig TUNING_CONFIG = createTuningConfig();
private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils()
.getRowIngestionMetersFactory();
private static final Map<DataSegment, File> SEGMENT_MAP = new HashMap<>();
private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP);
private static IndexingServiceClient INDEXING_SERVICE_CLIENT = new NoopIndexingServiceClient();
private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager();
private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
@ -250,6 +253,7 @@ public class CompactionTaskTest
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT);
}
)
)
@ -277,9 +281,9 @@ public class CompactionTaskTest
return dimensions;
}
private static IndexTuningConfig createTuningConfig()
private static ParallelIndexTuningConfig createTuningConfig()
{
return new IndexTuningConfig(
return new ParallelIndexTuningConfig(
null,
null, // null to compute maxRowsPerSegment automatically
500000,
@ -288,7 +292,6 @@ public class CompactionTaskTest
null,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@ -296,11 +299,18 @@ public class CompactionTaskTest
LongEncodingStrategy.LONGS
),
null,
5000,
null,
true,
false,
5000L,
null,
null,
null,
null,
null,
null,
null,
null,
100L,
null,
null,
null,
@ -332,6 +342,7 @@ public class CompactionTaskTest
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
ROW_INGESTION_METERS_FACTORY,
INDEXING_SERVICE_CLIENT,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -359,6 +370,7 @@ public class CompactionTaskTest
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
ROW_INGESTION_METERS_FACTORY,
INDEXING_SERVICE_CLIENT,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -384,6 +396,7 @@ public class CompactionTaskTest
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
ROW_INGESTION_METERS_FACTORY,
INDEXING_SERVICE_CLIENT,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -424,7 +437,7 @@ public class CompactionTaskTest
@Test
public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
@ -457,8 +470,8 @@ public class CompactionTaskTest
@Test
public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
100000,
null,
500000,
1000000L,
@ -466,7 +479,6 @@ public class CompactionTaskTest
null,
null,
null,
new HashedPartitionsSpec(6, null, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@ -474,17 +486,24 @@ public class CompactionTaskTest
LongEncodingStrategy.LONGS
),
null,
5000,
null,
true,
false,
null,
100L,
null,
null,
10,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
@ -518,16 +537,15 @@ public class CompactionTaskTest
@Test
public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
500000,
1000000L,
1000000L,
null,
null,
null,
null,
new HashedPartitionsSpec(null, 6, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@ -535,17 +553,24 @@ public class CompactionTaskTest
LongEncodingStrategy.LONGS
),
null,
5000,
true,
false,
null,
100L,
false,
false,
5000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
@ -579,7 +604,7 @@ public class CompactionTaskTest
@Test
public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
500000,
@ -587,7 +612,6 @@ public class CompactionTaskTest
null,
null,
null,
null,
new HashedPartitionsSpec(null, 3, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@ -596,17 +620,24 @@ public class CompactionTaskTest
LongEncodingStrategy.LONGS
),
null,
5000,
null,
true,
false,
5000L,
null,
null,
10,
null,
null,
null,
null,
null,
100L,
null,
null,
null,
null
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
@ -667,7 +698,7 @@ public class CompactionTaskTest
)
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
@ -708,7 +739,7 @@ public class CompactionTaskTest
new DoubleMaxAggregatorFactory("custom_double_max", "agg_4")
};
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
@ -742,7 +773,7 @@ public class CompactionTaskTest
@Test
public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)),
new PartitionConfigurationManager(TUNING_CONFIG),
@ -831,6 +862,7 @@ public class CompactionTaskTest
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
ROW_INGESTION_METERS_FACTORY,
INDEXING_SERVICE_CLIENT,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
@ -845,7 +877,7 @@ public class CompactionTaskTest
@Test
public void testSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
@ -880,7 +912,7 @@ public class CompactionTaskTest
@Test
public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
@ -955,7 +987,7 @@ public class CompactionTaskTest
}
private void assertIngestionSchema(
List<IndexIngestionSpec> ingestionSchemas,
List<ParallelIndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs,
List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals,
@ -967,7 +999,7 @@ public class CompactionTaskTest
expectedDimensionsSpecs,
expectedMetricsSpec,
expectedSegmentIntervals,
new IndexTuningConfig(
new ParallelIndexTuningConfig(
null,
null,
500000,
@ -975,8 +1007,7 @@ public class CompactionTaskTest
Long.MAX_VALUE,
null,
null,
null,
new HashedPartitionsSpec(null, null, null), // automatically computed targetPartitionSize
new HashedPartitionsSpec(5000000, null, null), // automatically computed targetPartitionSize
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@ -984,11 +1015,18 @@ public class CompactionTaskTest
LongEncodingStrategy.LONGS
),
null,
5000,
null,
true,
false,
5000L,
null,
null,
null,
null,
null,
null,
null,
null,
100L,
null,
null,
null,
@ -999,11 +1037,11 @@ public class CompactionTaskTest
}
private void assertIngestionSchema(
List<IndexIngestionSpec> ingestionSchemas,
List<ParallelIndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs,
List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals,
IndexTuningConfig expectedTuningConfig,
ParallelIndexTuningConfig expectedTuningConfig,
Granularity expectedSegmentGranularity
)
{
@ -1015,7 +1053,7 @@ public class CompactionTaskTest
);
for (int i = 0; i < ingestionSchemas.size(); i++) {
final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
final ParallelIndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i);
// assert dataSchema
@ -1048,7 +1086,7 @@ public class CompactionTaskTest
);
// assert ioConfig
final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting());
final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);

View File

@ -24,6 +24,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
@ -58,6 +59,7 @@ import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
@ -86,6 +88,7 @@ public abstract class IngestionTestBase
private final TestUtils testUtils = new TestUtils();
private final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
private SegmentLoaderFactory segmentLoaderFactory;
private TaskStorage taskStorage;
private IndexerSQLMetadataStorageCoordinator storageCoordinator;
private MetadataSegmentManager segmentManager;
@ -112,6 +115,7 @@ public abstract class IngestionTestBase
derbyConnectorRule.getConnector()
);
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
}
@After
@ -136,6 +140,11 @@ public abstract class IngestionTestBase
lockbox.remove(task);
}
public SegmentLoader newSegmentLoader(File storageDir)
{
return segmentLoaderFactory.manufacturate(storageDir);
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
@ -146,6 +155,11 @@ public abstract class IngestionTestBase
return taskStorage;
}
public SegmentLoaderFactory getSegmentLoaderFactory()
{
return segmentLoaderFactory;
}
public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator()
{
return storageCoordinator;

View File

@ -99,10 +99,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
0
);
TestLocalTaskActionClient actionClient;
LocalIndexingServiceClient indexingServiceClient;
TaskToolbox toolbox;
File localDeepStorage;
protected TestLocalTaskActionClient actionClient;
protected LocalIndexingServiceClient indexingServiceClient;
protected TaskToolbox toolbox;
protected File localDeepStorage;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -128,7 +128,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
);
}
class LocalIndexingServiceClient extends NoopIndexingServiceClient
public class LocalIndexingServiceClient extends NoopIndexingServiceClient
{
private final ConcurrentMap<String, Future<TaskStatus>> tasks = new ConcurrentHashMap<>();
private final ListeningExecutorService service = MoreExecutors.listeningDecorator(
@ -246,13 +246,13 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
}
}
void shutdown()
public void shutdown()
{
service.shutdownNow();
}
}
TaskToolbox createTaskToolbox(Task task) throws IOException
protected TaskToolbox createTaskToolbox(Task task) throws IOException
{
return new TaskToolbox(
null,
@ -278,7 +278,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
null,
newSegmentLoader(temporaryFolder.newFolder()),
getObjectMapper(),
temporaryFolder.newFolder(task.getId()),
getIndexIO(),
@ -307,6 +307,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
{
super(
id,
null,
taskResource,
ingestionSchema,
context,

View File

@ -192,6 +192,7 @@ public class MultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervi
null,
null,
null,
null,
partitionsSpec,
null,
null,

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@ -161,7 +162,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
)
{
final TestFirehoseFactory firehoseFactory = (TestFirehoseFactory) ioConfig.getFirehoseFactory();
final int numTotalSubTasks = firehoseFactory.getNumSplits();
final int numTotalSubTasks = firehoseFactory.getNumSplits(null);
// set up ingestion spec
final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
@ -201,6 +202,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
null,
null,
null,
null,
numTotalSubTasks,
null,
null,
@ -256,13 +258,13 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
}
@Override
public Stream<InputSplit<TestInput>> getSplits()
public Stream<InputSplit<TestInput>> getSplits(@Nullable SplitHintSpec splitHintSpec)
{
return splits.stream();
}
@Override
public int getNumSplits()
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
{
return splits.size();
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
@ -432,6 +433,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
null,
null,
null,
null,
NUM_SUB_TASKS,
null,
null,
@ -465,13 +467,13 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
}
@Override
public Stream<InputSplit<Integer>> getSplits()
public Stream<InputSplit<Integer>> getSplits(@Nullable SplitHintSpec splitHintSpec)
{
return ids.stream().map(InputSplit::new);
}
@Override
public int getNumSplits()
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
{
return ids.size();
}
@ -596,7 +598,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
new LocalParallelIndexTaskClientFactory(supervisorTask)
);
final TestFirehose firehose = (TestFirehose) getIngestionSpec().getIOConfig().getFirehoseFactory();
final InputSplit<Integer> split = firehose.getSplits().findFirst().orElse(null);
final InputSplit<Integer> split = firehose.getSplits(null).findFirst().orElse(null);
if (split == null) {
throw new ISE("Split is null");
}

View File

@ -156,6 +156,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
{
return new ParallelIndexSupervisorTask(
ID,
null,
taskResource,
ingestionSpec,
context,
@ -248,6 +249,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
null,
null,
null,
null,
partitionsSpec,
null,
null,

View File

@ -71,6 +71,7 @@ public class ParallelIndexTuningConfigTest
1000L,
null,
null,
null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@ -87,7 +88,7 @@ public class ParallelIndexTuningConfigTest
null,
250,
100,
20,
20L,
new Duration(3600),
128,
null,
@ -112,6 +113,7 @@ public class ParallelIndexTuningConfigTest
1000L,
null,
null,
null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@ -128,7 +130,7 @@ public class ParallelIndexTuningConfigTest
null,
maxNumConcurrentSubTasks,
100,
20,
20L,
new Duration(3600),
128,
null,
@ -153,6 +155,7 @@ public class ParallelIndexTuningConfigTest
1000L,
null,
null,
null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@ -169,7 +172,7 @@ public class ParallelIndexTuningConfigTest
maxNumSubTasks,
null,
100,
20,
20L,
new Duration(3600),
128,
null,
@ -196,6 +199,7 @@ public class ParallelIndexTuningConfigTest
1000L,
null,
null,
null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@ -212,7 +216,7 @@ public class ParallelIndexTuningConfigTest
maxNumSubTasks,
maxNumSubTasks,
100,
20,
20L,
new Duration(3600),
128,
null,

View File

@ -301,6 +301,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
null,
null,
null,
null,
1,
null,
null,
@ -373,6 +374,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
null,
null,
null,
null,
2,
null,
null,
@ -431,9 +433,9 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
);
}
private static class TestSupervisorTask extends TestParallelIndexSupervisorTask
public static class TestSupervisorTask extends TestParallelIndexSupervisorTask
{
TestSupervisorTask(
public TestSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
@ -451,7 +453,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
}
private static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTaskRunner
public static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTaskRunner
{
private final ParallelIndexSupervisorTask supervisorTask;
@ -496,7 +498,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
}
private static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec
public static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec
{
private final ParallelIndexSupervisorTask supervisorTask;

View File

@ -163,11 +163,11 @@ public class IngestSegmentFirehoseFactoryTimelineTest
private void testSplit() throws Exception
{
Assert.assertTrue(factory.isSplittable());
final int numSplits = factory.getNumSplits();
final int numSplits = factory.getNumSplits(null);
// We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte.
Assert.assertEquals(segmentCount, numSplits);
final List<InputSplit<List<WindowedSegmentId>>> splits =
factory.getSplits().collect(Collectors.toList());
factory.getSplits(null).collect(Collectors.toList());
Assert.assertEquals(numSplits, splits.size());
int count = 0;

View File

@ -21,6 +21,7 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig;
@ -38,28 +39,46 @@ public class ClientCompactQueryTuningConfig
@Nullable
private final Long maxTotalRows;
@Nullable
private final SplitHintSpec splitHintSpec;
@Nullable
private final IndexSpec indexSpec;
@Nullable
private final Integer maxPendingPersists;
@Nullable
private final Long pushTimeout;
@Nullable
private final Integer maxNumConcurrentSubTasks;
public static ClientCompactQueryTuningConfig from(
@Nullable UserCompactTuningConfig userCompactionTaskQueryTuningConfig,
@Nullable Integer maxRowsPerSegment
)
{
return new ClientCompactQueryTuningConfig(
maxRowsPerSegment,
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(),
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(),
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxTotalRows(),
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getIndexSpec(),
userCompactionTaskQueryTuningConfig == null
? null
: userCompactionTaskQueryTuningConfig.getMaxPendingPersists(),
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getPushTimeout()
);
if (userCompactionTaskQueryTuningConfig == null) {
return new ClientCompactQueryTuningConfig(
maxRowsPerSegment,
null,
null,
null,
null,
null,
null,
null,
null
);
} else {
return new ClientCompactQueryTuningConfig(
maxRowsPerSegment,
userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(),
userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(),
userCompactionTaskQueryTuningConfig.getMaxTotalRows(),
userCompactionTaskQueryTuningConfig.getSplitHintSpec(),
userCompactionTaskQueryTuningConfig.getIndexSpec(),
userCompactionTaskQueryTuningConfig.getMaxPendingPersists(),
userCompactionTaskQueryTuningConfig.getPushTimeout(),
userCompactionTaskQueryTuningConfig.getMaxNumConcurrentSubTasks()
);
}
}
@JsonCreator
@ -68,24 +87,28 @@ public class ClientCompactQueryTuningConfig
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("pushTimeout") @Nullable Long pushTimeout
@JsonProperty("pushTimeout") @Nullable Long pushTimeout,
@JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks
)
{
this.maxRowsPerSegment = maxRowsPerSegment;
this.maxBytesInMemory = maxBytesInMemory;
this.maxRowsInMemory = maxRowsInMemory;
this.maxTotalRows = maxTotalRows;
this.splitHintSpec = splitHintSpec;
this.indexSpec = indexSpec;
this.maxPendingPersists = maxPendingPersists;
this.pushTimeout = pushTimeout;
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
}
@JsonProperty
public String getType()
{
return "index";
return "index_parallel";
}
@JsonProperty
@ -116,6 +139,13 @@ public class ClientCompactQueryTuningConfig
return maxTotalRows;
}
@Nullable
@JsonProperty
public SplitHintSpec getSplitHintSpec()
{
return splitHintSpec;
}
public long getMaxTotalRowsOr(long defaultMaxTotalRows)
{
return maxTotalRows == null ? defaultMaxTotalRows : maxTotalRows;
@ -142,6 +172,13 @@ public class ClientCompactQueryTuningConfig
return pushTimeout;
}
@JsonProperty
@Nullable
public Integer getMaxNumConcurrentSubTasks()
{
return maxNumConcurrentSubTasks;
}
@Override
public boolean equals(Object o)
{
@ -158,7 +195,8 @@ public class ClientCompactQueryTuningConfig
Objects.equals(maxTotalRows, that.maxTotalRows) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(maxPendingPersists, that.maxPendingPersists) &&
Objects.equals(pushTimeout, that.pushTimeout);
Objects.equals(pushTimeout, that.pushTimeout) &&
Objects.equals(maxNumConcurrentSubTasks, that.maxNumConcurrentSubTasks);
}
@Override
@ -171,7 +209,8 @@ public class ClientCompactQueryTuningConfig
maxTotalRows,
indexSpec,
maxPendingPersists,
pushTimeout
pushTimeout,
maxNumConcurrentSubTasks
);
}
@ -186,6 +225,7 @@ public class ClientCompactQueryTuningConfig
", indexSpec=" + indexSpec +
", maxPendingPersists=" + maxPendingPersists +
", pushTimeout=" + pushTimeout +
", maxNumConcurrentSubTasks=" + maxNumConcurrentSubTasks +
'}';
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import javax.annotation.Nullable;
@ -84,13 +85,13 @@ public class InlineFirehoseFactory implements FiniteFirehoseFactory<StringInputR
}
@Override
public Stream<InputSplit<String>> getSplits()
public Stream<InputSplit<String>> getSplits(@Nullable SplitHintSpec splitHintSpec)
{
return Stream.of(new InputSplit<>(data));
}
@Override
public int getNumSplits()
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
{
return 1;
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.segment.IndexSpec;
import org.joda.time.Period;
@ -158,9 +159,11 @@ public class DataSourceCompactionConfig
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("pushTimeout") @Nullable Long pushTimeout
@JsonProperty("pushTimeout") @Nullable Long pushTimeout,
@JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks
)
{
super(
@ -168,9 +171,11 @@ public class DataSourceCompactionConfig
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
splitHintSpec,
indexSpec,
maxPendingPersists,
pushTimeout
pushTimeout,
maxNumConcurrentSubTasks
);
}

View File

@ -92,6 +92,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks());
// dataSource -> list of intervals of compact tasks
final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
int numEstimatedNonCompleteCompactionTasks = 0;
for (TaskStatusPlus status : compactTasks) {
final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId());
if (response == null) {
@ -101,6 +102,8 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
final Interval interval = compactQuery.getIoConfig().getInputSpec().getInterval();
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
final int numSubTasks = findNumMaxConcurrentSubTasks(compactQuery.getTuningConfig());
numEstimatedNonCompleteCompactionTasks += numSubTasks + 1; // count the compaction task itself
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
}
@ -112,13 +115,19 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
dynamicConfig.getMaxCompactionTaskSlots()
);
final int numNonCompleteCompactionTasks = compactTasks.size();
final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0
? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks)
// compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
// This guarantees that at least one slot is available if
// compaction is enabled and numRunningCompactTasks is 0.
: Math.max(1, compactionTaskCapacity);
final int numAvailableCompactionTaskSlots;
if (numEstimatedNonCompleteCompactionTasks > 0) {
numAvailableCompactionTaskSlots = Math.max(
0,
compactionTaskCapacity - numEstimatedNonCompleteCompactionTasks
);
} else {
// compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
// This guarantees that at least one slot is available if
// compaction is enabled and numRunningCompactTasks is 0.
numAvailableCompactionTaskSlots = Math.max(1, compactionTaskCapacity);
}
LOG.info(
"Found [%d] available task slots for compaction out of [%d] max compaction task capacity",
numAvailableCompactionTaskSlots,
@ -141,6 +150,25 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
.build();
}
/**
* Each compaction task can run a parallel indexing task. When we count the number of current running
* compaction tasks, we should count the sub tasks of the parallel indexing task as well. However, we currently
* don't have a good way to get the number of current running sub tasks except poking each supervisor task,
* which is complex to handle all kinds of failures. Here, we simply return {@code maxNumConcurrentSubTasks} instead
* to estimate the number of sub tasks conservatively. This should be ok since it won't affect to the performance of
* other ingestion types.
*/
private int findNumMaxConcurrentSubTasks(@Nullable ClientCompactQueryTuningConfig tuningConfig)
{
if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) {
// The actual number of subtasks might be smaller than the configured max.
// However, we use the max to simplify the estimation here.
return tuningConfig.getMaxNumConcurrentSubTasks();
} else {
return 0;
}
}
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses)
{
return taskStatuses
@ -164,7 +192,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
{
int numSubmittedTasks = 0;
for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) {
for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots;) {
final List<DataSegment> segmentsToCompact = iterator.next();
if (!segmentsToCompact.isEmpty()) {
@ -182,6 +210,8 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
taskId,
Iterables.transform(segmentsToCompact, DataSegment::getId)
);
// Count the compaction task itself + its sub tasks
numSubmittedTasks += findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1;
} else {
throw new ISE("segmentsToCompact is empty?");
}

View File

@ -85,7 +85,7 @@ public class InlineFirehoseFactoryTest
{
Assert.assertTrue(target instanceof FiniteFirehoseFactory);
Assert.assertFalse(target.isSplittable());
Assert.assertEquals(1, target.getNumSplits());
Assert.assertEquals(1, target.getNumSplits(null));
}
@Test(expected = NullPointerException.class)
@ -115,7 +115,7 @@ public class InlineFirehoseFactoryTest
@Test
public void testForcedSplitAndClone()
{
Optional<InputSplit<String>> inputSplitOptional = target.getSplits().findFirst();
Optional<InputSplit<String>> inputSplitOptional = target.getSplits(null).findFirst();
Assert.assertTrue(inputSplitOptional.isPresent());
FiniteFirehoseFactory<StringInputRowParser, String> cloneWithSplit = target.withSplit(inputSplitOptional.get());
Assert.assertTrue(cloneWithSplit instanceof InlineFirehoseFactory);

View File

@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
@ -93,7 +94,7 @@ public class DataSourceCompactionConfigTest
@Test
public void testSerdeUserCompactTuningConfig() throws IOException
{
final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null, null);
final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null, null, null, null);
final String json = OBJECT_MAPPER.writeValueAsString(config);
// Check maxRowsPerSegment doesn't exist in the JSON string
Assert.assertFalse(json.contains("maxRowsPerSegment"));
@ -116,6 +117,8 @@ public class DataSourceCompactionConfigTest
10000L,
null,
null,
null,
null,
null
),
ImmutableMap.of("key", "val")
@ -147,6 +150,8 @@ public class DataSourceCompactionConfigTest
10000L,
null,
null,
null,
null,
null
),
ImmutableMap.of("key", "val")
@ -171,6 +176,7 @@ public class DataSourceCompactionConfigTest
1000,
10000L,
2000L,
new SegmentsSplitHintSpec(10000L),
new IndexSpec(
new RoaringBitmapSerdeFactory(false),
CompressionStrategy.LZF,
@ -178,7 +184,8 @@ public class DataSourceCompactionConfigTest
LongEncodingStrategy.LONGS
),
1,
3000L
3000L,
null
);
final String json = OBJECT_MAPPER.writeValueAsString(tuningConfig);

View File

@ -918,6 +918,8 @@ InlineFirehose
LocalFirehose
PartitionsSpec
PasswordProviders
SegmentsSplitHintSpec
SplitHintSpec
appendToExisting
baseDir
chatHandlerNumRetries
@ -938,6 +940,7 @@ reportParseExceptions
segmentWriteOutMediumFactory
sql
sqls
splitHintSpec
taskStatusCheckPeriodMs
timeChunk
totalNumMergeTasks