From 3f794535060ec2011a5c02653edad298712d35bf Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Wed, 15 Dec 2021 11:12:21 -0600 Subject: [PATCH] Lock count guardrail for parallel single phase/sequential task (#12052) * Lock count guardrail for parallel single phase/sequential task * PR comments --- .../common/task/AbstractBatchIndexTask.java | 15 ++- .../indexing/common/task/CompactionTask.java | 5 +- .../druid/indexing/common/task/IndexTask.java | 9 +- .../MaxAllowedLocksExceededException.java | 41 ++++++ .../batch/parallel/AbstractBatchSubtask.java | 2 +- .../parallel/ParallelIndexPhaseRunner.java | 11 +- .../parallel/ParallelIndexSupervisorTask.java | 26 +++- .../parallel/ParallelIndexTaskRunner.java | 4 +- .../parallel/ParallelIndexTuningConfig.java | 25 +++- .../SinglePhaseParallelIndexTaskRunner.java | 6 + .../task/batch/parallel/TaskMonitor.java | 2 +- .../druid/indexing/overlord/TaskQueue.java | 10 +- .../ClientCompactionTaskQuerySerdeTest.java | 1 + .../common/task/CompactionTaskRunTest.java | 1 + .../common/task/CompactionTaskTest.java | 1 + ...stractParallelIndexSupervisorTaskTest.java | 2 + .../parallel/HashPartitionTaskKillTest.java | 8 +- .../ParallelIndexSupervisorTaskKillTest.java | 1 + ...rallelIndexSupervisorTaskResourceTest.java | 1 + .../ParallelIndexSupervisorTaskSerdeTest.java | 1 + .../ParallelIndexSupervisorTaskTest.java | 1 + .../parallel/ParallelIndexTestingFactory.java | 1 + .../ParallelIndexTuningConfigTest.java | 7 + .../parallel/RangePartitionTaskKillTest.java | 8 +- .../SinglePhaseParallelIndexingTest.java | 127 ++++++++++++++++++ 25 files changed, 292 insertions(+), 24 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/MaxAllowedLocksExceededException.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index df479a5663a..ce2ab80c67b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -41,6 +41,7 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.indexing.input.InputRowSchemas; @@ -111,9 +112,12 @@ public abstract class AbstractBatchIndexTask extends AbstractTask private TaskLockHelper taskLockHelper; + private final int maxAllowedLockCount; + protected AbstractBatchIndexTask(String id, String dataSource, Map context) { super(id, dataSource, context); + maxAllowedLockCount = -1; } protected AbstractBatchIndexTask( @@ -121,10 +125,12 @@ public abstract class AbstractBatchIndexTask extends AbstractTask @Nullable String groupId, @Nullable TaskResource taskResource, String dataSource, - @Nullable Map context + @Nullable Map context, + int maxAllowedLockCount ) { super(id, groupId, taskResource, dataSource, context); + this.maxAllowedLockCount = maxAllowedLockCount; } /** @@ -404,11 +410,17 @@ public abstract class AbstractBatchIndexTask extends AbstractTask // Intervals are already condensed to avoid creating too many locks. // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup. Interval prev = null; + int locksAcquired = 0; while (intervalIterator.hasNext()) { final Interval cur = intervalIterator.next(); if (prev != null && cur.equals(prev)) { continue; } + + if (maxAllowedLockCount >= 0 && locksAcquired >= maxAllowedLockCount) { + throw new MaxAllowedLocksExceededException(maxAllowedLockCount); + } + prev = cur; final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur)); if (lock == null) { @@ -417,6 +429,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask if (lock.isRevoked()) { throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", cur)); } + locksAcquired++; } return true; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 13af7047989..c031d082a30 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -195,7 +195,7 @@ public class CompactionTask extends AbstractBatchIndexTask @JacksonInject RetryPolicyFactory retryPolicyFactory ) { - super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context); + super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context, -1); Checks.checkOneNotNullOrEmpty( ImmutableList.of( new Property<>("ioConfig", ioConfig), @@ -1271,7 +1271,8 @@ public class CompactionTask extends AbstractBatchIndexTask maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + null ); Preconditions.checkArgument( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index d99013dac62..e1b6fd88b27 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -194,7 +194,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ingestionSchema.dataSchema.getDataSource(), null, ingestionSchema, - context + context, + -1 ); } @@ -205,7 +206,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler String dataSource, @Nullable String baseSequenceName, IndexIngestionSpec ingestionSchema, - Map context + Map context, + int maxAllowedLockCount ) { super( @@ -213,7 +215,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler groupId, resource, dataSource, - context + context, + maxAllowedLockCount ); this.baseSequenceName = baseSequenceName == null ? getId() : baseSequenceName; this.ingestionSchema = ingestionSchema; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/MaxAllowedLocksExceededException.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/MaxAllowedLocksExceededException.java new file mode 100644 index 00000000000..0d0fb3aee94 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/MaxAllowedLocksExceededException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; + +public class MaxAllowedLocksExceededException extends ISE +{ + public MaxAllowedLocksExceededException(int maxAllowedLockCount) + { + super(createMessage(maxAllowedLockCount)); + } + + private static String createMessage( + int maxAllowedLockCount + ) + { + return StringUtils.format( + "Number of locks exceeded maxAllowedLockCount [%s].", + maxAllowedLockCount + ); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java index c270613ac4d..9b721ada5bd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java @@ -40,7 +40,7 @@ public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask @Nullable Map context ) { - super(id, groupId, taskResource, dataSource, context); + super(id, groupId, taskResource, dataSource, context, -1); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index 890cb90fd3d..393b6182645 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -71,6 +71,7 @@ public abstract class ParallelIndexPhaseRunner taskMonitor; + private volatile String stopReason; private int nextSpecId = 0; @@ -266,9 +267,10 @@ public abstract class ParallelIndexPhaseRunner { final ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner) currentRunnerObject; - runner.stopGracefully(); + runner.stopGracefully(null); }); } else { currentSubTaskHolder = new CurrentSubTaskHolder((taskObject, taskConfig) -> { @@ -581,10 +583,15 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } else { // there is only success or failure after running.... Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state); - final String errorMessage = StringUtils.format( - TASK_PHASE_FAILURE_MSG, - parallelSinglePhaseRunner.getName() - ); + final String errorMessage; + if (parallelSinglePhaseRunner.getStopReason() != null) { + errorMessage = parallelSinglePhaseRunner.getStopReason(); + } else { + errorMessage = StringUtils.format( + TASK_PHASE_FAILURE_MSG, + parallelSinglePhaseRunner.getName() + ); + } taskStatus = TaskStatus.failure(getId(), errorMessage); } toolbox.getTaskReportFileWriter().write( @@ -1089,7 +1096,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen getIngestionSchema().getIOConfig(), convertToIndexTuningConfig(getIngestionSchema().getTuningConfig()) ), - getContext() + getContext(), + getIngestionSchema().getTuningConfig().getMaxAllowedLockCount() ); if (currentSubTaskHolder.setTask(sequentialIndexTask) @@ -1220,6 +1228,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen return Response.ok(toolbox.getJsonMapper().writeValueAsBytes(segmentIdentifier)).build(); } + catch (MaxAllowedLocksExceededException malee) { + getCurrentRunner().stopGracefully(malee.getMessage()); + return Response.status(Response.Status.BAD_REQUEST).entity(malee.getMessage()).build(); + } catch (IOException | IllegalStateException e) { return Response.serverError().entity(Throwables.getStackTraceAsString(e)).build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java index 05103e85c6a..3cf61dac370 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java @@ -58,7 +58,9 @@ public interface ParallelIndexTaskRunner 0, "maxNumConcurrentSubTasks must be positive"); Preconditions.checkArgument(this.maxNumSegmentsToMerge > 0, "maxNumSegmentsToMerge must be positive"); Preconditions.checkArgument(this.totalNumMergeTasks > 0, "totalNumMergeTasks must be positive"); @@ -257,6 +266,12 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig return totalNumMergeTasks; } + @JsonProperty + public int getMaxAllowedLockCount() + { + return maxAllowedLockCount; + } + @Override public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) { @@ -290,7 +305,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxColumnsToMerge(), - getAwaitSegmentAvailabilityTimeoutMillis() + getAwaitSegmentAvailabilityTimeoutMillis(), + getMaxAllowedLockCount() ); } @@ -313,6 +329,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig chatHandlerNumRetries == that.chatHandlerNumRetries && maxNumSegmentsToMerge == that.maxNumSegmentsToMerge && totalNumMergeTasks == that.totalNumMergeTasks && + maxAllowedLockCount == that.maxAllowedLockCount && Objects.equals(splitHintSpec, that.splitHintSpec) && Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout); } @@ -329,7 +346,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig chatHandlerTimeout, chatHandlerNumRetries, maxNumSegmentsToMerge, - totalNumMergeTasks + totalNumMergeTasks, + maxAllowedLockCount ); } @@ -345,6 +363,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig ", chatHandlerNumRetries=" + chatHandlerNumRetries + ", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge + ", totalNumMergeTasks=" + totalNumMergeTasks + + ", maxAllowedLockCount=" + maxAllowedLockCount + "} " + super.toString(); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index df9673a28d7..ad68400a567 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException; import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -343,6 +344,11 @@ public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner interval = granularitySpec.getSegmentGranularity().bucket(timestamp); version = ParallelIndexSupervisorTask.findVersion(versions, interval); if (version == null) { + final int maxAllowedLockCount = getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(); + if (maxAllowedLockCount >= 0 && locks.size() >= maxAllowedLockCount) { + throw new MaxAllowedLocksExceededException(maxAllowedLockCount); + } + // We don't have a lock for this interval, so we should lock it now. final TaskLock lock = Preconditions.checkNotNull( getToolbox().getTaskActionClient().submit( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index 7fe92cb4262..fe55db83830 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -97,7 +97,7 @@ public class TaskMonitor constructorFeeder() { @@ -592,6 +597,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv null, null, null, + null, null ), VALID_INPUT_SOURCE_FILTER @@ -677,6 +683,127 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles); } + @Test + public void testMaxLocksWith1MaxNumConcurrentSubTasks() + { + final Interval interval = Intervals.of("2017-12/P1M"); + final boolean appendToExisting = false; + final ParallelIndexSupervisorTask task = newTask( + interval, + Granularities.DAY, + appendToExisting, + true, + new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 1, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 0 + ), + VALID_INPUT_SOURCE_FILTER + ); + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + + if (lockGranularity.equals(LockGranularity.TIME_CHUNK)) { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "Number of locks exceeded maxAllowedLockCount [0]" + ); + getIndexingServiceClient().runAndWait(task); + } else { + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner()); + assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList()); + } + } + + + @Test + public void testMaxLocksWith2MaxNumConcurrentSubTasks() + { + final Interval interval = Intervals.of("2017-12/P1M"); + final boolean appendToExisting = false; + final ParallelIndexSupervisorTask task = newTask( + interval, + Granularities.DAY, + appendToExisting, + true, + new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 2, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 0 + ), + VALID_INPUT_SOURCE_FILTER + ); + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + + if (lockGranularity.equals(LockGranularity.TIME_CHUNK)) { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "Number of locks exceeded maxAllowedLockCount [0]" + ); + getIndexingServiceClient().runAndWait(task); + } else { + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner()); + assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList()); + } + } + private ParallelIndexSupervisorTask newTask( @Nullable Interval interval, boolean appendToExisting,