Allow missing intervals for Parallel task with hash/range partitioning (#10592)

* Allow missing intervals for Parallel task

* fix row filter

* fix tests

* fix log
This commit is contained in:
Jihoon Son 2020-11-25 14:50:22 -08:00 committed by GitHub
parent d0c2ede50c
commit 7462b0b953
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 526 additions and 247 deletions

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import java.io.IOException;
/**
* A {@link TaskActionClient} that wraps a given {@link TaskAction} with {@link SurrogateAction}.
* All subtasks of {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask} must
* use this client or wrap taskActions manually.
*/
public class SurrogateTaskActionClient implements TaskActionClient
{
private final String supervisorTaskId;
private final TaskActionClient delegate;
public SurrogateTaskActionClient(String supervisorTaskId, TaskActionClient delegate)
{
this.supervisorTaskId = supervisorTaskId;
this.delegate = delegate;
}
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
return delegate.submit(new SurrogateAction<>(supervisorTaskId, taskAction));
}
}

View File

@ -25,7 +25,8 @@ import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.TaskLockHelper.OverwritingRootGenerationPartitions;
import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
import org.apache.druid.java.util.common.ISE;
@ -58,8 +59,12 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorFor
final PartitionsSpec partitionsSpec
)
{
final TaskActionClient taskActionClient =
supervisorTaskAccess == null
? toolbox.getTaskActionClient()
: new SurrogateTaskActionClient(supervisorTaskAccess.getSupervisorTaskId(), toolbox.getTaskActionClient());
this.internalAllocator = new ActionBasedSegmentAllocator(
toolbox.getTaskActionClient(),
taskActionClient,
dataSchema,
(schema, row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> {
final GranularitySpec granularitySpec = schema.getGranularitySpec();
@ -72,34 +77,17 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorFor
taskLockHelper,
interval
);
if (supervisorTaskAccess != null) {
return new SurrogateAction<>(
supervisorTaskAccess.getSupervisorTaskId(),
new SegmentAllocateAction(
schema.getDataSource(),
row.getTimestamp(),
schema.getGranularitySpec().getQueryGranularity(),
schema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
previousSegmentId,
skipSegmentLineageCheck,
partialShardSpec,
taskLockHelper.getLockGranularityToUse()
)
);
} else {
return new SegmentAllocateAction(
schema.getDataSource(),
row.getTimestamp(),
schema.getGranularitySpec().getQueryGranularity(),
schema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
previousSegmentId,
skipSegmentLineageCheck,
partialShardSpec,
taskLockHelper.getLockGranularityToUse()
);
}
return new SegmentAllocateAction(
schema.getDataSource(),
row.getTimestamp(),
schema.getGranularitySpec().getQueryGranularity(),
schema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
previousSegmentId,
skipSegmentLineageCheck,
partialShardSpec,
taskLockHelper.getLockGranularityToUse()
);
}
);
this.sequenceNameFunction = new LinearlyPartitionedSequenceNameFunction(taskId);

View File

@ -63,6 +63,11 @@ public class ParallelIndexIngestionSpec extends IngestionSpec<ParallelIndexIOCon
this.tuningConfig = tuningConfig == null ? ParallelIndexTuningConfig.defaultConfig() : tuningConfig;
}
public ParallelIndexIngestionSpec withDataSchema(DataSchema dataSchema)
{
return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig);
}
@Override
@JsonProperty("dataSchema")
public DataSchema getDataSchema()

View File

@ -188,10 +188,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
throw new ISE("forceGuaranteedRollup is set but intervals is missing in granularitySpec");
}
}
this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
@ -290,7 +286,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@VisibleForTesting
PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(
TaskToolbox toolbox,
Integer numShardsOverride
ParallelIndexIngestionSpec ingestionSchema,
@Nullable Map<Interval, Integer> intervalToNumShardsOverride
)
{
return new PartialHashSegmentGenerateParallelIndexTaskRunner(
@ -299,7 +296,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
getGroupId(),
ingestionSchema,
getContext(),
numShardsOverride
intervalToNumShardsOverride
);
}
@ -318,7 +315,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@VisibleForTesting
PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner(
TaskToolbox toolbox,
Map<Interval, PartitionBoundaries> intervalToPartitions
Map<Interval, PartitionBoundaries> intervalToPartitions,
ParallelIndexIngestionSpec ingestionSchema
)
{
return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
@ -334,16 +332,17 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@VisibleForTesting
PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(
TaskToolbox toolbox,
List<PartialGenericSegmentMergeIOConfig> ioConfigs
List<PartialGenericSegmentMergeIOConfig> ioConfigs,
ParallelIndexIngestionSpec ingestionSchema
)
{
return new PartialGenericSegmentMergeParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
getIngestionSchema().getDataSchema(),
ingestionSchema.getDataSchema(),
ioConfigs,
getIngestionSchema().getTuningConfig(),
ingestionSchema.getTuningConfig(),
getContext()
);
}
@ -529,9 +528,30 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
: runHashPartitionMultiPhaseParallel(toolbox);
}
private static ParallelIndexIngestionSpec rewriteIngestionSpecWithIntervalsIfMissing(
ParallelIndexIngestionSpec ingestionSchema,
Collection<Interval> intervals
)
{
if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return ingestionSchema
.withDataSchema(
ingestionSchema.getDataSchema().withGranularitySpec(
ingestionSchema
.getDataSchema()
.getGranularitySpec()
.withIntervals(new ArrayList<>(intervals))
)
);
} else {
return ingestionSchema;
}
}
private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
TaskState state;
ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
// only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
@ -541,49 +561,64 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
);
}
final Integer numShardsOverride;
final Map<Interval, Integer> intervalToNumShards;
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
if (partitionsSpec.getNumShards() == null) {
// 0. need to determine numShards by scanning the data
LOG.info("numShards is unspecified, beginning %s phase.", PartialDimensionCardinalityTask.TYPE);
final boolean needsInputSampling =
partitionsSpec.getNumShards() == null
|| ingestionSchemaToUse.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
if (needsInputSampling) {
// 0. need to determine intervals and numShards by scanning the data
LOG.info("Needs to determine intervals or numShards, beginning %s phase.", PartialDimensionCardinalityTask.TYPE);
ParallelIndexTaskRunner<PartialDimensionCardinalityTask, DimensionCardinalityReport> cardinalityRunner =
createRunner(
toolbox,
this::createPartialDimensionCardinalityRunner
);
if (cardinalityRunner == null) {
throw new ISE("Could not create cardinality runner for hash partitioning.");
}
state = runNextPhase(cardinalityRunner);
if (state.isFailure()) {
return TaskStatus.failure(getId());
}
int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null
? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
: partitionsSpec.getMaxRowsPerSegment();
LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment);
if (cardinalityRunner.getReports() == null) {
throw new ISE("Could not determine cardinalities for hash partitioning.");
if (cardinalityRunner.getReports().isEmpty()) {
String msg = "No valid rows for hash partitioning."
+ " All rows may have invalid timestamps or have been filtered out.";
LOG.warn(msg);
return TaskStatus.success(getId(), msg);
}
numShardsOverride = determineNumShardsFromCardinalityReport(
cardinalityRunner.getReports().values(),
effectiveMaxRowsPerSegment
);
LOG.info("Automatically determined numShards: " + numShardsOverride);
if (partitionsSpec.getNumShards() == null) {
int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null
? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
: partitionsSpec.getMaxRowsPerSegment();
LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment);
intervalToNumShards = determineNumShardsFromCardinalityReport(
cardinalityRunner.getReports().values(),
effectiveMaxRowsPerSegment
);
} else {
intervalToNumShards = CollectionUtils.mapValues(
mergeCardinalityReports(cardinalityRunner.getReports().values()),
k -> partitionsSpec.getNumShards()
);
}
ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
ingestionSchemaToUse,
intervalToNumShards.keySet()
);
} else {
numShardsOverride = null;
// numShards will be determined in PartialHashSegmentGenerateTask
intervalToNumShards = null;
}
// 1. Partial segment generation phase
final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
createRunner(
toolbox,
f -> createPartialHashSegmentGenerateRunner(toolbox, numShardsOverride)
f -> createPartialHashSegmentGenerateRunner(toolbox, segmentCreateIngestionSpec, intervalToNumShards)
);
state = runNextPhase(indexingRunner);
@ -600,9 +635,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
partitionToLocations
);
final ParallelIndexIngestionSpec segmentMergeIngestionSpec = ingestionSchemaToUse;
final ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec)
);
state = runNextPhase(mergeRunner);
if (state.isSuccess()) {
@ -615,6 +651,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
ParallelIndexTaskRunner<PartialDimensionDistributionTask, DimensionDistributionReport> distributionRunner =
createRunner(
toolbox,
@ -631,13 +668,22 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (intervalToPartitions.isEmpty()) {
String msg = "No valid rows for single dimension partitioning."
+ " All rows may have invalid timestamps or multiple dimension values.";
+ " All rows may have invalid timestamps or multiple dimension values.";
LOG.warn(msg);
return TaskStatus.success(getId(), msg);
}
ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
ingestionSchemaToUse,
intervalToPartitions.keySet()
);
final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
createRunner(toolbox, tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions));
createRunner(
toolbox,
tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions, segmentCreateIngestionSpec)
);
TaskState indexingState = runNextPhase(indexingRunner);
if (indexingState.isFailure()) {
@ -652,9 +698,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
partitionToLocations
);
final ParallelIndexIngestionSpec segmentMergeIngestionSpec = ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec)
);
TaskState mergeState = runNextPhase(mergeRunner);
if (mergeState.isSuccess()) {
@ -664,48 +711,45 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return TaskStatus.fromCode(getId(), mergeState);
}
private static Map<Interval, Union> mergeCardinalityReports(Collection<DimensionCardinalityReport> reports)
{
Map<Interval, Union> finalCollectors = new HashMap<>();
reports.forEach(report -> {
Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
finalCollectors.computeIfAbsent(
entry.getKey(),
k -> new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K)
).update(entryHll);
}
});
return finalCollectors;
}
@VisibleForTesting
public static int determineNumShardsFromCardinalityReport(
public static Map<Interval, Integer> determineNumShardsFromCardinalityReport(
Collection<DimensionCardinalityReport> reports,
int maxRowsPerSegment
)
{
// aggregate all the sub-reports
Map<Interval, Union> finalCollectors = new HashMap<>();
reports.forEach(report -> {
Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
Union union = finalCollectors.computeIfAbsent(
entry.getKey(),
(key) -> {
return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
}
);
HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
union.update(entryHll);
}
});
Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
// determine the highest cardinality in any interval
long maxCardinality = 0;
for (Union union : finalCollectors.values()) {
maxCardinality = Math.max(maxCardinality, (long) union.getEstimate());
}
LOG.info("Estimated max cardinality: " + maxCardinality);
// determine numShards based on maxRowsPerSegment and the highest per-interval cardinality
long numShards = maxCardinality / maxRowsPerSegment;
if (maxCardinality % maxRowsPerSegment != 0) {
// if there's a remainder add 1 so we stay under maxRowsPerSegment
numShards += 1;
}
try {
return Math.toIntExact(numShards);
}
catch (ArithmeticException ae) {
throw new ISE("Estimated numShards [%s] exceeds integer bounds.", numShards);
}
return CollectionUtils.mapValues(
finalCollectors,
union -> {
final double estimatedCardinality = union.getEstimate();
// determine numShards based on maxRowsPerSegment and the cardinality
final long estimatedNumShards = Math.round(estimatedCardinality / maxRowsPerSegment);
try {
return Math.max(Math.toIntExact(estimatedNumShards), 1);
}
catch (ArithmeticException ae) {
throw new ISE("Estimated numShards [%s] exceeds integer bounds.", estimatedNumShards);
}
}
);
}
private Map<Interval, PartitionBoundaries> determineAllRangePartitions(Collection<DimensionDistributionReport> reports)

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.data.input.InputFormat;
@ -32,12 +33,12 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@ -52,11 +53,11 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
{
public static final String TYPE = "partial_dimension_cardinality";
private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
@ -125,10 +126,14 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
taskActionClient,
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return tryTimeChunkLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
return true;
}
}
@Override
@ -159,6 +164,7 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty();
try (
final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
@ -166,7 +172,7 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
dataSchema,
inputSource,
inputFormat,
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
determineIntervals ? Objects::nonNull : AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);
@ -197,8 +203,15 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
InputRow inputRow = inputRowIterator.next();
// null rows are filtered out by FilteringCloseableInputRowIterator
DateTime timestamp = inputRow.getTimestamp();
//noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
Interval interval = granularitySpec.bucketInterval(timestamp).get();
final Interval interval;
if (granularitySpec.inputIntervals().isEmpty()) {
interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
} else {
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
// this interval must exist since it passed the rowFilter
assert optInterval.isPresent();
interval = optInterval.get();
}
Granularity queryGranularity = granularitySpec.getQueryGranularity();
HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.hash.BloomFilter;
@ -34,6 +35,7 @@ import org.apache.druid.data.input.Rows;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
@ -55,6 +57,7 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
/**
@ -163,10 +166,14 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
taskActionClient,
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return tryTimeChunkLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
return true;
}
}
@Override
@ -195,6 +202,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty();
try (
final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
@ -202,7 +210,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
dataSchema,
inputSource,
inputFormat,
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
determineIntervals ? Objects::nonNull : AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);
@ -243,10 +251,15 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
continue;
}
DateTime timestamp = inputRow.getTimestamp();
//noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
Interval interval = granularitySpec.bucketInterval(timestamp).get();
final Interval interval;
if (granularitySpec.inputIntervals().isEmpty()) {
interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
} else {
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
// this interval must exist since it passed the rowFilter
assert optInterval.isPresent();
interval = optInterval.get();
}
String partitionDimensionValue = Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {

View File

@ -21,7 +21,9 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexing.common.TaskToolbox;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Map;
/**
@ -32,7 +34,8 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
{
private static final String PHASE_NAME = "partial segment generation";
private Integer numShardsOverride;
@Nullable
private final Map<Interval, Integer> intervalToNumShardsOverride;
PartialHashSegmentGenerateParallelIndexTaskRunner(
TaskToolbox toolbox,
@ -40,11 +43,11 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
String groupId,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
Integer numShardsOverride
@Nullable Map<Interval, Integer> intervalToNumShardsOverride
)
{
super(toolbox, taskId, groupId, ingestionSchema, context);
this.numShardsOverride = numShardsOverride;
this.intervalToNumShardsOverride = intervalToNumShardsOverride;
}
@Override
@ -82,7 +85,7 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
numAttempts,
subTaskIngestionSpec,
context,
numShardsOverride
intervalToNumShardsOverride
);
}
};

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
@ -57,7 +58,8 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String supervisorTaskId;
private final Integer numShardsOverride;
@Nullable
private final Map<Interval, Integer> intervalToNumShardsOverride;
@JsonCreator
public PartialHashSegmentGenerateTask(
@ -69,7 +71,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
@JsonProperty("numAttempts") final int numAttempts, // zero-based counting
@JsonProperty(PROP_SPEC) final ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") final Map<String, Object> context,
@Nullable @JsonProperty("numShardsOverride") final Integer numShardsOverride
@JsonProperty("intervalToNumShardsOverride") @Nullable final Map<Interval, Integer> intervalToNumShardsOverride
)
{
super(
@ -85,7 +87,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.numShardsOverride = numShardsOverride;
this.intervalToNumShardsOverride = intervalToNumShardsOverride;
}
@JsonProperty
@ -106,6 +108,13 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
return supervisorTaskId;
}
@Nullable
@JsonProperty
public Map<Interval, Integer> getIntervalToNumShardsOverride()
{
return intervalToNumShardsOverride;
}
@Override
public String getType()
{
@ -116,7 +125,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
taskActionClient,
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
@ -134,7 +143,11 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
getId(),
granularitySpec,
new SupervisorTaskAccess(supervisorTaskId, taskClient),
createHashPartitionAnalysisFromPartitionsSpec(granularitySpec, partitionsSpec, numShardsOverride)
createHashPartitionAnalysisFromPartitionsSpec(
granularitySpec,
partitionsSpec,
intervalToNumShardsOverride
)
);
}
@ -170,22 +183,24 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
public static HashPartitionAnalysis createHashPartitionAnalysisFromPartitionsSpec(
GranularitySpec granularitySpec,
@Nonnull HashedPartitionsSpec partitionsSpec,
@Nullable Integer numShardsOverride
@Nullable Map<Interval, Integer> intervalToNumShardsOverride
)
{
final SortedSet<Interval> intervals = granularitySpec.bucketIntervals().get();
final int numBucketsPerInterval;
if (numShardsOverride != null) {
numBucketsPerInterval = numShardsOverride;
} else {
numBucketsPerInterval = partitionsSpec.getNumShards() == null
? 1
: partitionsSpec.getNumShards();
}
final HashPartitionAnalysis partitionAnalysis = new HashPartitionAnalysis(partitionsSpec);
intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval));
if (intervalToNumShardsOverride != null) {
// Some intervals populated from granularitySpec can be missing in intervalToNumShardsOverride
// because intervalToNumShardsOverride contains only the intervals which exist in input data.
// We only care about the intervals in intervalToNumShardsOverride here.
intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket);
} else {
final SortedSet<Interval> intervals = granularitySpec.bucketIntervals().get();
final int numBucketsPerInterval = partitionsSpec.getNumShards() == null
? 1
: partitionsSpec.getNumShards();
intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval));
}
return partitionAnalysis;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
@ -135,9 +136,12 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
return true;
return tryTimeChunkLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
@Override

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@ -81,6 +82,10 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
context
);
Preconditions.checkArgument(
!ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(),
"Missing intervals in granularitySpec"
);
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.inputRowIteratorBuilder = inputRowIteratorBuilder;

View File

@ -101,6 +101,10 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
context
);
Preconditions.checkArgument(
!dataSchema.getGranularitySpec().inputIntervals().isEmpty(),
"Missing intervals in granularitySpec"
);
this.ioConfig = ioConfig;
this.numAttempts = numAttempts;
this.supervisorTaskId = supervisorTaskId;

View File

@ -62,12 +62,7 @@ abstract class PerfectRollupWorkerTask extends AbstractBatchIndexTask
checkPartitionsSpec(tuningConfig.getGivenOrDefaultPartitionsSpec());
granularitySpec = dataSchema.getGranularitySpec();
Preconditions.checkArgument(
!granularitySpec.inputIntervals().isEmpty(),
"Missing intervals in granularitySpec"
);
this.granularitySpec = dataSchema.getGranularitySpec();
this.dataSchema = dataSchema;
this.tuningConfig = tuningConfig;
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
@ -149,7 +150,10 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec());
return determineLockGranularityAndTryLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
ingestionSchema.getDataSchema().getGranularitySpec()
);
}
@JsonProperty

View File

@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -71,6 +72,7 @@ import java.util.Set;
abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
{
protected static final String DATASOURCE = "dataSource";
protected static final Granularity SEGMENT_GRANULARITY = Granularities.DAY;
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
@ -176,7 +178,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
Granularities.DAY,
SEGMENT_GRANULARITY,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
);

View File

@ -109,34 +109,74 @@ public class DimensionCardinalityReportTest
reports.add(report3);
// first interval in test has cardinality 4
int numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
Map<Interval, Integer> intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
1
);
Assert.assertEquals(4L, numShards);
Assert.assertEquals(
ImmutableMap.of(
Intervals.of("1970-01-01/P1D"),
4,
Intervals.of("1970-01-02/P1D"),
1
),
intervalToNumShards
);
numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
2
);
Assert.assertEquals(2L, numShards);
Assert.assertEquals(
ImmutableMap.of(
Intervals.of("1970-01-01/P1D"),
2,
Intervals.of("1970-01-02/P1D"),
1
),
intervalToNumShards
);
numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
3
);
Assert.assertEquals(2L, numShards);
Assert.assertEquals(
ImmutableMap.of(
Intervals.of("1970-01-01/P1D"),
1,
Intervals.of("1970-01-02/P1D"),
1
),
intervalToNumShards
);
numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
4
);
Assert.assertEquals(1L, numShards);
Assert.assertEquals(
ImmutableMap.of(
Intervals.of("1970-01-01/P1D"),
1,
Intervals.of("1970-01-02/P1D"),
1
),
intervalToNumShards
);
numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
5
);
Assert.assertEquals(1L, numShards);
Assert.assertEquals(
ImmutableMap.of(
Intervals.of("1970-01-01/P1D"),
1,
Intervals.of("1970-01-02/P1D"),
1
),
intervalToNumShards
);
}
}

View File

@ -31,8 +31,10 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -86,35 +88,44 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
@Parameterized.Parameters(
name = "lockGranularity={0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, numShards={3}"
name = "lockGranularity={0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, intervalToIndex={3}, numShards={4}"
)
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false, 2, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 1, 2},
new Object[]{LockGranularity.SEGMENT, true, 2, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2, null},
new Object[]{LockGranularity.TIME_CHUNK, true, 1, null},
new Object[]{LockGranularity.SEGMENT, true, 2, null}
new Object[]{LockGranularity.TIME_CHUNK, false, 2, INTERVAL_TO_INDEX, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, 2},
new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, null},
new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, null},
new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, null},
new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, null}
);
}
private final int maxNumConcurrentSubTasks;
@Nullable
private final Interval intervalToIndex;
@Nullable
private final Integer numShards;
private File inputDir;
// sorted input intervals
private List<Interval> inputIntervals;
public HashPartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks,
@Nullable Interval intervalToIndex,
@Nullable Integer numShards
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
this.intervalToIndex = intervalToIndex;
this.numShards = numShards;
}
@ -122,6 +133,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
public void setup() throws IOException
{
inputDir = temporaryFolder.newFolder("data");
final Set<Interval> intervals = new HashSet<>();
// set up data
for (int i = 0; i < 10; i++) {
try (final Writer writer =
@ -129,6 +141,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
for (int j = 0; j < 10; j++) {
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i));
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i));
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 1))));
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 2))));
}
}
}
@ -139,33 +153,70 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i + 1, i + 10, i));
}
}
inputIntervals = new ArrayList<>(intervals);
inputIntervals.sort(Comparators.intervalsByStartThenEnd());
}
@Test
public void testRun() throws Exception
{
final Integer maxRowsPerSegment = numShards == null ? 10 : null;
final Set<DataSegment> publishedSegments = runTestTask(
new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1", "dim2")),
new HashedPartitionsSpec(
maxRowsPerSegment,
numShards,
ImmutableList.of("dim1", "dim2")
),
TaskState.SUCCESS,
false
);
// we don't specify maxRowsPerSegment so it defaults to DEFAULT_MAX_ROWS_PER_SEGMENT,
// which is 5 million, so assume that there will only be 1 shard if numShards is not set.
int expectedSegmentCount = numShards != null ? numShards : 1;
assertHashedPartition(publishedSegments, expectedSegmentCount);
final Map<Interval, Integer> expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments(
maxRowsPerSegment,
numShards
);
assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
}
@Test
public void testRunWithHashPartitionFunction() throws Exception
{
final Integer maxRowsPerSegment = numShards == null ? 10 : null;
final Set<DataSegment> publishedSegments = runTestTask(
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"), HashPartitionFunction.MURMUR3_32_ABS),
new HashedPartitionsSpec(
maxRowsPerSegment,
numShards,
ImmutableList.of("dim1", "dim2"),
HashPartitionFunction.MURMUR3_32_ABS
),
TaskState.SUCCESS,
false
);
assertHashedPartition(publishedSegments, 2);
final Map<Interval, Integer> expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments(
maxRowsPerSegment,
numShards
);
assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
}
private Map<Interval, Integer> computeExpectedIntervalToNumSegments(
@Nullable Integer maxRowsPerSegment,
@Nullable Integer numShards
)
{
final Map<Interval, Integer> expectedIntervalToNumSegments = new HashMap<>();
for (int i = 0; i < inputIntervals.size(); i++) {
if (numShards == null) {
if (i == 0 || i == inputIntervals.size() - 1) {
expectedIntervalToNumSegments.put(inputIntervals.get(i), Math.round((float) 10 / maxRowsPerSegment));
} else {
expectedIntervalToNumSegments.put(inputIntervals.get(i), Math.round((float) 20 / maxRowsPerSegment));
}
} else {
expectedIntervalToNumSegments.put(inputIntervals.get(i), numShards);
}
}
return expectedIntervalToNumSegments;
}
@Test
@ -236,7 +287,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
DIMENSIONS_SPEC,
INPUT_FORMAT,
null,
INTERVAL_TO_INDEX,
intervalToIndex,
inputDir,
"test_*",
partitionsSpec,
@ -250,7 +301,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
null,
null,
PARSE_SPEC,
INTERVAL_TO_INDEX,
intervalToIndex,
inputDir,
"test_*",
partitionsSpec,
@ -261,15 +312,21 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
}
}
private void assertHashedPartition(Set<DataSegment> publishedSegments, int expectedNumSegments) throws IOException
private void assertHashedPartition(
Set<DataSegment> publishedSegments,
Map<Interval, Integer> expectedIntervalToNumSegments
) throws IOException
{
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
publishedSegments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
Assert.assertEquals(new HashSet<>(inputIntervals), intervalToSegments.keySet());
final File tempSegmentDir = temporaryFolder.newFolder();
for (List<DataSegment> segmentsInInterval : intervalToSegments.values()) {
Assert.assertEquals(expectedNumSegments, segmentsInInterval.size());
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
Interval interval = entry.getKey();
List<DataSegment> segmentsInInterval = entry.getValue();
Assert.assertEquals(expectedIntervalToNumSegments.get(interval).intValue(), segmentsInInterval.size());
for (DataSegment segment : segmentsInInterval) {
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
final HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();

View File

@ -91,25 +91,6 @@ public class ParallelIndexSupervisorTaskSerdeTest
Assert.assertEquals(task, OBJECT_MAPPER.readValue(json, Task.class));
}
@Test
public void forceGuaranteedRollupWithMissingIntervals()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(
"forceGuaranteedRollup is set but intervals is missing in granularitySpec"
);
Integer numShards = 2;
new ParallelIndexSupervisorTaskBuilder()
.ingestionSpec(
new ParallelIndexIngestionSpecBuilder()
.forceGuaranteedRollup(true)
.partitionsSpec(new HashedPartitionsSpec(null, numShards, null))
.build()
)
.build();
}
@Test
public void forceGuaranteedRollupWithHashPartitionsMissingNumShards()
{

View File

@ -108,19 +108,6 @@ public class PartialDimensionCardinalityTaskTest
.build();
}
@Test
public void requiresGranularitySpecInputIntervals()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Missing intervals in granularitySpec");
DataSchema dataSchema = ParallelIndexTestingFactory.createDataSchema(Collections.emptyList());
new PartialDimensionCardinalityTaskBuilder()
.dataSchema(dataSchema)
.build();
}
@Test
public void serializesDeserializes()
{

View File

@ -111,19 +111,6 @@ public class PartialDimensionDistributionTaskTest
.build();
}
@Test
public void requiresGranularitySpecInputIntervals()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Missing intervals in granularitySpec");
DataSchema dataSchema = ParallelIndexTestingFactory.createDataSchema(Collections.emptyList());
new PartialDimensionDistributionTaskBuilder()
.dataSchema(dataSchema)
.build();
}
@Test
public void serializesDeserializes()
{

View File

@ -24,7 +24,9 @@ import org.apache.druid.segment.TestHelper;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
@ -54,6 +56,9 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
.build()
);
@Rule
public ExpectedException exception = ExpectedException.none();
private PartialGenericSegmentMergeTask target;
@Before
@ -82,4 +87,27 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
String id = target.getId();
Assert.assertThat(id, Matchers.startsWith(PartialGenericSegmentMergeTask.TYPE));
}
@Test
public void requiresGranularitySpecInputIntervals()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Missing intervals in granularitySpec");
new PartialGenericSegmentMergeTask(
ParallelIndexTestingFactory.AUTOMATIC_ID,
ParallelIndexTestingFactory.GROUP_ID,
ParallelIndexTestingFactory.TASK_RESOURCE,
ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
ParallelIndexTestingFactory.NUM_ATTEMPTS,
new PartialGenericSegmentMergeIngestionSpec(
ParallelIndexTestingFactory.createDataSchema(null),
IO_CONFIG,
new ParallelIndexTestingFactory.TuningConfigBuilder()
.partitionsSpec(PARTITIONS_SPEC)
.build()
),
ParallelIndexTestingFactory.CONTEXT
);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@ -33,10 +34,13 @@ import org.hamcrest.Matchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.util.List;
import java.util.Map;
public class PartialHashSegmentGenerateTaskTest
{
@ -48,6 +52,9 @@ public class PartialHashSegmentGenerateTaskTest
ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS)
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
private PartialHashSegmentGenerateTask target;
@Before
@ -102,4 +109,62 @@ public class PartialHashSegmentGenerateTaskTest
Assert.assertEquals(expectedNumBuckets, partitionAnalysis.getBucketAnalysis(interval).intValue());
}
}
@Test
public void testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsMap()
{
final List<Interval> intervals = ImmutableList.of(
Intervals.of("2020-01-01/2020-01-02"),
Intervals.of("2020-01-02/2020-01-03"),
Intervals.of("2020-01-03/2020-01-04")
);
final Map<Interval, Integer> intervalToNumShards = ImmutableMap.of(
Intervals.of("2020-01-01/2020-01-02"),
1,
Intervals.of("2020-01-02/2020-01-03"),
2,
Intervals.of("2020-01-03/2020-01-04"),
3
);
final HashPartitionAnalysis partitionAnalysis = PartialHashSegmentGenerateTask
.createHashPartitionAnalysisFromPartitionsSpec(
new UniformGranularitySpec(
Granularities.DAY,
Granularities.NONE,
intervals
),
new HashedPartitionsSpec(null, null, null),
intervalToNumShards
);
Assert.assertEquals(intervals.size(), partitionAnalysis.getNumTimePartitions());
for (Interval interval : intervals) {
Assert.assertEquals(
intervalToNumShards.get(interval).intValue(),
partitionAnalysis.getBucketAnalysis(interval).intValue()
);
}
}
@Test
public void requiresGranularitySpecInputIntervals()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Missing intervals in granularitySpec");
new PartialHashSegmentGenerateTask(
ParallelIndexTestingFactory.AUTOMATIC_ID,
ParallelIndexTestingFactory.GROUP_ID,
ParallelIndexTestingFactory.TASK_RESOURCE,
ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
ParallelIndexTestingFactory.NUM_ATTEMPTS,
ParallelIndexTestingFactory.createIngestionSpec(
new LocalInputSource(new File("baseDir"), "filer"),
new JsonInputFormat(null, null, null),
new ParallelIndexTestingFactory.TuningConfigBuilder().build(),
ParallelIndexTestingFactory.createDataSchema(null)
),
ParallelIndexTestingFactory.CONTEXT,
null
);
}
}

View File

@ -63,17 +63,6 @@ public class PerfectRollupWorkerTaskTest
.build();
}
@Test
public void requiresGranularitySpecInputIntervals()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Missing intervals in granularitySpec");
new PerfectRollupWorkerTaskBuilder()
.granularitySpecInputIntervals(Collections.emptyList())
.build();
}
@Test
public void succeedsWithValidPartitionsSpec()
{

View File

@ -108,15 +108,16 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
0
);
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}, intervalToIndex={4}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM}, // will spawn subtask
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM} // expected to fail
new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, null},
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX}, // will spawn subtask
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX} // expected to fail
);
}
@ -132,17 +133,21 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
private final int maxNumConcurrentSubTasks;
private final boolean useMultivalueDim;
@Nullable
private final Interval intervalToIndex;
public RangePartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks,
boolean useMultivalueDim
boolean useMultivalueDim,
@Nullable Interval intervalToIndex
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
this.useMultivalueDim = useMultivalueDim;
this.intervalToIndex = intervalToIndex;
}
@Before
@ -309,7 +314,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
DIMENSIONS_SPEC,
INPUT_FORMAT,
null,
INTERVAL_TO_INDEX,
intervalToIndex,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
partitionsSpec,
@ -323,7 +328,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
null,
null,
PARSE_SPEC,
INTERVAL_TO_INDEX,
intervalToIndex,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
partitionsSpec,