diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index ab14f4dac7d..28b915d22cd 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -4150,7 +4150,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testResumeAllActivelyReadingTasks() throws Exception { - supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); + supervisor = getTestableSupervisor(2, 3, true, "PT1H", null, null); // Mock with task based setup for resumeAsync EasyMock.reset(taskClient); addSomeEvents(100); @@ -4195,7 +4195,27 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.getTuningConfig() ); - List tasks = ImmutableList.of(readingTask, publishingTask, pausedTask, failsToResumePausedTask); + KafkaIndexTask waitingTask = createKafkaIndexTask("waitingTask", + DATASOURCE, + 2, + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(2, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(2, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() + ); + + KafkaIndexTask pendingTask = createKafkaIndexTask("pendingTask", + DATASOURCE, + 2, + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(2, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(2, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() + ); + + List tasks = ImmutableList.of(readingTask, publishingTask, pausedTask, failsToResumePausedTask, waitingTask, pendingTask); Collection taskRunnerWorkItems = ImmutableList.of( new TestTaskRunnerWorkItem(readingTask, null, new TaskLocation("testHost", 1001, -1)), new TestTaskRunnerWorkItem(publishingTask, null, new TaskLocation("testHost", 1002, -1)), @@ -4220,6 +4240,10 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Optional.of(TaskStatus.running(pausedTask.getId()))).anyTimes(); EasyMock.expect(taskStorage.getStatus(failsToResumePausedTask.getId())) .andReturn(Optional.of(TaskStatus.running(failsToResumePausedTask.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getStatus(waitingTask.getId())) + .andReturn(Optional.of(TaskStatus.running(waitingTask.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getStatus(pendingTask.getId())) + .andReturn(Optional.of(TaskStatus.running(pendingTask.getId()))).anyTimes(); EasyMock.expect(taskStorage.getTask(readingTask.getId())) .andReturn(Optional.of(readingTask)).anyTimes(); @@ -4229,6 +4253,10 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Optional.of(pausedTask)).anyTimes(); EasyMock.expect(taskStorage.getTask(failsToResumePausedTask.getId())) .andReturn(Optional.of(failsToResumePausedTask)).anyTimes(); + EasyMock.expect(taskStorage.getTask(waitingTask.getId())) + .andReturn(Optional.of(waitingTask)).anyTimes(); + EasyMock.expect(taskStorage.getTask(pendingTask.getId())) + .andReturn(Optional.of(pendingTask)).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -4237,6 +4265,12 @@ public class KafkaSupervisorTest extends EasyMockSupport ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true); + + EasyMock.expect(taskClient.getStartTimeAsync(waitingTask.getId())) + .andReturn(Futures.immediateFuture(null)); + EasyMock.expect(taskClient.getStartTimeAsync(pendingTask.getId())) + .andReturn(Futures.immediateFuture(null)); + EasyMock.expect(taskClient.getStatusAsync(readingTask.getId())) .andReturn(Futures.immediateFuture(Status.READING)); EasyMock.expect(taskClient.getStatusAsync(publishingTask.getId())) @@ -4245,6 +4279,10 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture(Status.PAUSED)); EasyMock.expect(taskClient.getStatusAsync(failsToResumePausedTask.getId())) .andReturn(Futures.immediateFuture(Status.PAUSED)); + EasyMock.expect(taskClient.getStatusAsync(waitingTask.getId())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)); + EasyMock.expect(taskClient.getStatusAsync(pendingTask.getId())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)); EasyMock.expect(taskClient.getEndOffsetsAsync(publishingTask.getId())) .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 0L))); @@ -4258,6 +4296,12 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskClient.getCheckpointsAsync(failsToResumePausedTask.getId(), true)) .andReturn(Futures.immediateFuture(new TreeMap<>())); + EasyMock.expect(taskClient.getCheckpointsAsync(waitingTask.getId(), true)) + .andReturn(Futures.immediateFuture(null)); + + EasyMock.expect(taskClient.getCheckpointsAsync(pendingTask.getId(), true)) + .andReturn(Futures.immediateFuture(null)); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); // Only the active i.e non-publishing tasks are resumed diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 50fbe2f721d..265a9fc144f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3102,6 +3102,11 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + final Collection workItems = new ArrayList(); + workItems.add(new TestTaskRunnerWorkItem(id1, null, new TaskLocation(id1.getId(), 8100, 8100))); + workItems.add(new TestTaskRunnerWorkItem(id2, null, new TaskLocation(id2.getId(), 8100, 8100))); + workItems.add(new TestTaskRunnerWorkItem(id3, null, new TaskLocation(id3.getId(), 8100, 8100))); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 53769f84985..68cf3bf7964 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2014,9 +2014,24 @@ public abstract class SeekableStreamSupervisor> tasksToResume = new HashMap<>(); + if (activelyReadingTaskGroups.isEmpty()) { + return; + } + // Resume only running tasks and not pending / waiting ones. + if (!taskMaster.getTaskRunner().isPresent()) { + return; + } + Set runningTaskIds = taskMaster.getTaskRunner() + .get() + .getRunningTasks() + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toSet()); for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { for (String taskId : taskGroup.tasks.keySet()) { - tasksToResume.put(taskId, taskClient.resumeAsync(taskId)); + if (runningTaskIds.contains(taskId)) { + tasksToResume.put(taskId, taskClient.resumeAsync(taskId)); + } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index ab3f9c5f9f1..61260221bbe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -331,6 +331,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes(); replayAll(); @@ -564,6 +565,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).times(3); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes(); replayAll();