Lock count guardrail for parallel single phase/sequential task (#12052)

* Lock count guardrail for parallel single phase/sequential task

* PR comments
This commit is contained in:
Jonathan Wei 2021-12-15 11:12:21 -06:00 committed by GitHub
parent 377edff042
commit 3f79453506
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 292 additions and 24 deletions

View File

@ -41,6 +41,7 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.InputRowSchemas;
@ -111,9 +112,12 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
private TaskLockHelper taskLockHelper;
private final int maxAllowedLockCount;
protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context)
{
super(id, dataSource, context);
maxAllowedLockCount = -1;
}
protected AbstractBatchIndexTask(
@ -121,10 +125,12 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
@Nullable Map<String, Object> context
@Nullable Map<String, Object> context,
int maxAllowedLockCount
)
{
super(id, groupId, taskResource, dataSource, context);
this.maxAllowedLockCount = maxAllowedLockCount;
}
/**
@ -404,11 +410,17 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
// Intervals are already condensed to avoid creating too many locks.
// Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup.
Interval prev = null;
int locksAcquired = 0;
while (intervalIterator.hasNext()) {
final Interval cur = intervalIterator.next();
if (prev != null && cur.equals(prev)) {
continue;
}
if (maxAllowedLockCount >= 0 && locksAcquired >= maxAllowedLockCount) {
throw new MaxAllowedLocksExceededException(maxAllowedLockCount);
}
prev = cur;
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
if (lock == null) {
@ -417,6 +429,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", cur));
}
locksAcquired++;
}
return true;
}

View File

@ -195,7 +195,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@JacksonInject RetryPolicyFactory retryPolicyFactory
)
{
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context, -1);
Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("ioConfig", ioConfig),
@ -1271,7 +1271,8 @@ public class CompactionTask extends AbstractBatchIndexTask
maxParseExceptions,
maxSavedParseExceptions,
maxColumnsToMerge,
awaitSegmentAvailabilityTimeoutMillis
awaitSegmentAvailabilityTimeoutMillis,
null
);
Preconditions.checkArgument(

View File

@ -194,7 +194,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
ingestionSchema.dataSchema.getDataSource(),
null,
ingestionSchema,
context
context,
-1
);
}
@ -205,7 +206,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
String dataSource,
@Nullable String baseSequenceName,
IndexIngestionSpec ingestionSchema,
Map<String, Object> context
Map<String, Object> context,
int maxAllowedLockCount
)
{
super(
@ -213,7 +215,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
groupId,
resource,
dataSource,
context
context,
maxAllowedLockCount
);
this.baseSequenceName = baseSequenceName == null ? getId() : baseSequenceName;
this.ingestionSchema = ingestionSchema;

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
public class MaxAllowedLocksExceededException extends ISE
{
public MaxAllowedLocksExceededException(int maxAllowedLockCount)
{
super(createMessage(maxAllowedLockCount));
}
private static String createMessage(
int maxAllowedLockCount
)
{
return StringUtils.format(
"Number of locks exceeded maxAllowedLockCount [%s].",
maxAllowedLockCount
);
}
}

View File

@ -40,7 +40,7 @@ public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask
@Nullable Map<String, Object> context
)
{
super(id, groupId, taskResource, dataSource, context);
super(id, groupId, taskResource, dataSource, context, -1);
}
/**

View File

@ -71,6 +71,7 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
private volatile boolean subTaskScheduleAndMonitorStopped;
private volatile TaskMonitor<SubTaskType, SubTaskReportType> taskMonitor;
private volatile String stopReason;
private int nextSpecId = 0;
@ -266,9 +267,10 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
}
@Override
public void stopGracefully()
public void stopGracefully(String stopReason)
{
subTaskScheduleAndMonitorStopped = true;
this.stopReason = stopReason;
stopInternal();
}
@ -420,6 +422,12 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
}
}
@Override
public String getStopReason()
{
return stopReason;
}
String getTaskId()
{
return taskId;
@ -445,6 +453,7 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
return tuningConfig;
}
@VisibleForTesting
TaskToolbox getToolbox()
{

View File

@ -57,6 +57,7 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
@ -202,7 +203,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
groupId,
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
context
context,
ingestionSchema.getTuningConfig().getMaxAllowedLockCount()
);
this.ingestionSchema = ingestionSchema;
@ -503,7 +505,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (isParallelMode()) {
currentSubTaskHolder = new CurrentSubTaskHolder((currentRunnerObject, taskConfig) -> {
final ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner) currentRunnerObject;
runner.stopGracefully();
runner.stopGracefully(null);
});
} else {
currentSubTaskHolder = new CurrentSubTaskHolder((taskObject, taskConfig) -> {
@ -581,10 +583,15 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
} else {
// there is only success or failure after running....
Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state);
final String errorMessage = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
parallelSinglePhaseRunner.getName()
);
final String errorMessage;
if (parallelSinglePhaseRunner.getStopReason() != null) {
errorMessage = parallelSinglePhaseRunner.getStopReason();
} else {
errorMessage = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
parallelSinglePhaseRunner.getName()
);
}
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
toolbox.getTaskReportFileWriter().write(
@ -1089,7 +1096,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
getIngestionSchema().getIOConfig(),
convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
),
getContext()
getContext(),
getIngestionSchema().getTuningConfig().getMaxAllowedLockCount()
);
if (currentSubTaskHolder.setTask(sequentialIndexTask)
@ -1220,6 +1228,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return Response.ok(toolbox.getJsonMapper().writeValueAsBytes(segmentIdentifier)).build();
}
catch (MaxAllowedLocksExceededException malee) {
getCurrentRunner().stopGracefully(malee.getMessage());
return Response.status(Response.Status.BAD_REQUEST).entity(malee.getMessage()).build();
}
catch (IOException | IllegalStateException e) {
return Response.serverError().entity(Throwables.getStackTraceAsString(e)).build();
}

View File

@ -58,7 +58,9 @@ public interface ParallelIndexTaskRunner<SubTaskType extends Task, SubTaskReport
* Stop this runner gracefully. This method is called when the task is killed.
* See {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}.
*/
void stopGracefully();
void stopGracefully(String stopReason);
String getStopReason();
/**
* {@link SubTaskReport} is the report sent by {@link SubTaskType}s. The subTasks call this method to

View File

@ -47,6 +47,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
private static final int DEFAULT_CHAT_HANDLER_NUM_RETRIES = 5;
private static final int DEFAULT_MAX_NUM_SEGMENTS_TO_MERGE = 100;
private static final int DEFAULT_TOTAL_NUM_MERGE_TASKS = 10;
private static final int DEFAULT_MAX_ALLOWED_LOCK_COUNT = -1;
private final SplitHintSpec splitHintSpec;
@ -71,6 +72,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
*/
private final int totalNumMergeTasks;
private final int maxAllowedLockCount;
public static ParallelIndexTuningConfig defaultConfig()
{
return new ParallelIndexTuningConfig(
@ -103,6 +106,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
null,
null,
null,
null,
null
);
}
@ -138,7 +142,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis
@JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis,
@JsonProperty("maxAllowedLockCount") @Nullable Integer maxAllowedLockCount
)
{
super(
@ -197,6 +202,10 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
? DEFAULT_TOTAL_NUM_MERGE_TASKS
: totalNumMergeTasks;
this.maxAllowedLockCount = maxAllowedLockCount == null
? DEFAULT_MAX_ALLOWED_LOCK_COUNT
: maxAllowedLockCount;
Preconditions.checkArgument(this.maxNumConcurrentSubTasks > 0, "maxNumConcurrentSubTasks must be positive");
Preconditions.checkArgument(this.maxNumSegmentsToMerge > 0, "maxNumSegmentsToMerge must be positive");
Preconditions.checkArgument(this.totalNumMergeTasks > 0, "totalNumMergeTasks must be positive");
@ -257,6 +266,12 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
return totalNumMergeTasks;
}
@JsonProperty
public int getMaxAllowedLockCount()
{
return maxAllowedLockCount;
}
@Override
public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
@ -290,7 +305,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getMaxColumnsToMerge(),
getAwaitSegmentAvailabilityTimeoutMillis()
getAwaitSegmentAvailabilityTimeoutMillis(),
getMaxAllowedLockCount()
);
}
@ -313,6 +329,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
chatHandlerNumRetries == that.chatHandlerNumRetries &&
maxNumSegmentsToMerge == that.maxNumSegmentsToMerge &&
totalNumMergeTasks == that.totalNumMergeTasks &&
maxAllowedLockCount == that.maxAllowedLockCount &&
Objects.equals(splitHintSpec, that.splitHintSpec) &&
Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout);
}
@ -329,7 +346,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
chatHandlerTimeout,
chatHandlerNumRetries,
maxNumSegmentsToMerge,
totalNumMergeTasks
totalNumMergeTasks,
maxAllowedLockCount
);
}
@ -345,6 +363,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
", chatHandlerNumRetries=" + chatHandlerNumRetries +
", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge +
", totalNumMergeTasks=" + totalNumMergeTasks +
", maxAllowedLockCount=" + maxAllowedLockCount +
"} " + super.toString();
}
}

View File

@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -343,6 +344,11 @@ public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner
interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
version = ParallelIndexSupervisorTask.findVersion(versions, interval);
if (version == null) {
final int maxAllowedLockCount = getIngestionSchema().getTuningConfig().getMaxAllowedLockCount();
if (maxAllowedLockCount >= 0 && locks.size() >= maxAllowedLockCount) {
throw new MaxAllowedLocksExceededException(maxAllowedLockCount);
}
// We don't have a lock for this interval, so we should lock it now.
final TaskLock lock = Preconditions.checkNotNull(
getToolbox().getTaskActionClient().submit(

View File

@ -97,7 +97,7 @@ public class TaskMonitor<T extends Task, SubTaskReportType extends SubTaskReport
* This metric is used only for unit tests because the current task status system doesn't track the canceled task
* status. Currently, this metric only represents the number of canceled tasks by {@link ParallelIndexTaskRunner}.
* See {@link #stop()}, {@link ParallelIndexPhaseRunner#run()}, and
* {@link ParallelIndexPhaseRunner#stopGracefully()}.
* {@link ParallelIndexPhaseRunner#stopGracefully(String)} ()}.
*/
private int numCanceledTasks;

View File

@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
@ -288,8 +289,13 @@ public class TaskQueue
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
final String errorMessage = "Failed while waiting for the task to be ready to run. "
+ "See overlord logs for more details.";
final String errorMessage;
if (e instanceof MaxAllowedLocksExceededException) {
errorMessage = e.getMessage();
} else {
errorMessage = "Failed while waiting for the task to be ready to run. "
+ "See overlord logs for more details.";
}
notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
continue;
}

View File

@ -279,6 +279,7 @@ public class ClientCompactionTaskQuerySerdeTest
null,
null,
null,
null,
null
)
)

View File

@ -320,6 +320,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
null,
null,
null,
null,
null
)
)

View File

@ -748,6 +748,7 @@ public class CompactionTaskTest
null,
null,
null,
null,
null
);

View File

@ -179,6 +179,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
5,
null,
null,
null
);
@ -298,6 +299,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
null,
null
);
}

View File

@ -387,11 +387,17 @@ public class HashPartitionTaskKillTest extends AbstractMultiPhaseParallelIndexin
}
@Override
public void stopGracefully()
public void stopGracefully(String stopReason)
{
}
@Override
public String getStopReason()
{
return null;
}
@Override
public void collectReport(DimensionCardinalityReport report)
{

View File

@ -201,6 +201,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
null,
null,
null,
null,
null
)
);

View File

@ -448,6 +448,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
null,
null,
null,
null,
null
)
);

View File

@ -275,6 +275,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
null,
null,
null,
null,
null
);

View File

@ -251,6 +251,7 @@ public class ParallelIndexSupervisorTaskTest
null,
null,
null,
null,
null
);
final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec(

View File

@ -187,6 +187,7 @@ class ParallelIndexTestingFactory
maxParseExceptions,
25,
null,
null,
null
);
}

View File

@ -102,6 +102,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
null,
null
);
final byte[] json = mapper.writeValueAsBytes(tuningConfig);
@ -148,6 +149,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
null,
null
);
final byte[] json = mapper.writeValueAsBytes(tuningConfig);
@ -194,6 +196,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
null,
null
);
final byte[] json = mapper.writeValueAsBytes(tuningConfig);
@ -242,6 +245,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
null,
null
);
}
@ -287,6 +291,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
null,
null
);
}
@ -332,6 +337,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
null,
null
);
}
@ -377,6 +383,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
null,
null
);
}

View File

@ -324,11 +324,17 @@ public class RangePartitionTaskKillTest extends AbstractMultiPhaseParallelIndexi
}
@Override
public void stopGracefully()
public void stopGracefully(String stopReason)
{
}
@Override
public String getStopReason()
{
return null;
}
@Override
public void collectReport(DimensionDistributionReport report)
{

View File

@ -53,7 +53,9 @@ import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -77,6 +79,9 @@ import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
{
@ -592,6 +597,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
null,
null,
null,
null,
null
),
VALID_INPUT_SOURCE_FILTER
@ -677,6 +683,127 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles);
}
@Test
public void testMaxLocksWith1MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
interval,
Granularities.DAY,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
0
),
VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
if (lockGranularity.equals(LockGranularity.TIME_CHUNK)) {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"Number of locks exceeded maxAllowedLockCount [0]"
);
getIndexingServiceClient().runAndWait(task);
} else {
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
}
@Test
public void testMaxLocksWith2MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
interval,
Granularities.DAY,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
0
),
VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
if (lockGranularity.equals(LockGranularity.TIME_CHUNK)) {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"Number of locks exceeded maxAllowedLockCount [0]"
);
getIndexingServiceClient().runAndWait(task);
} else {
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
boolean appendToExisting,