Fix needless task shutdown on leader switch (#13411)

* Fix needless task shutdown on leader switch

* Add unit test

* Fix style

* Fix UTs
This commit is contained in:
AmatyaAvadhanula 2022-12-01 18:31:08 +05:30 committed by GitHub
parent f6f625ee08
commit cc307e4c29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 3 deletions

View File

@ -4150,7 +4150,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResumeAllActivelyReadingTasks() throws Exception
{
supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
supervisor = getTestableSupervisor(2, 3, true, "PT1H", null, null);
// Mock with task based setup for resumeAsync
EasyMock.reset(taskClient);
addSomeEvents(100);
@ -4195,7 +4195,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.getTuningConfig()
);
List<Task> tasks = ImmutableList.of(readingTask, publishingTask, pausedTask, failsToResumePausedTask);
KafkaIndexTask waitingTask = createKafkaIndexTask("waitingTask",
DATASOURCE,
2,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(2, 0L), Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(2, Long.MAX_VALUE)),
null,
null,
supervisor.getTuningConfig()
);
KafkaIndexTask pendingTask = createKafkaIndexTask("pendingTask",
DATASOURCE,
2,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(2, 0L), Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(2, Long.MAX_VALUE)),
null,
null,
supervisor.getTuningConfig()
);
List<Task> tasks = ImmutableList.of(readingTask, publishingTask, pausedTask, failsToResumePausedTask, waitingTask, pendingTask);
Collection taskRunnerWorkItems = ImmutableList.of(
new TestTaskRunnerWorkItem(readingTask, null, new TaskLocation("testHost", 1001, -1)),
new TestTaskRunnerWorkItem(publishingTask, null, new TaskLocation("testHost", 1002, -1)),
@ -4220,6 +4240,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
.andReturn(Optional.of(TaskStatus.running(pausedTask.getId()))).anyTimes();
EasyMock.expect(taskStorage.getStatus(failsToResumePausedTask.getId()))
.andReturn(Optional.of(TaskStatus.running(failsToResumePausedTask.getId()))).anyTimes();
EasyMock.expect(taskStorage.getStatus(waitingTask.getId()))
.andReturn(Optional.of(TaskStatus.running(waitingTask.getId()))).anyTimes();
EasyMock.expect(taskStorage.getStatus(pendingTask.getId()))
.andReturn(Optional.of(TaskStatus.running(pendingTask.getId()))).anyTimes();
EasyMock.expect(taskStorage.getTask(readingTask.getId()))
.andReturn(Optional.of(readingTask)).anyTimes();
@ -4229,6 +4253,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
.andReturn(Optional.of(pausedTask)).anyTimes();
EasyMock.expect(taskStorage.getTask(failsToResumePausedTask.getId()))
.andReturn(Optional.of(failsToResumePausedTask)).anyTimes();
EasyMock.expect(taskStorage.getTask(waitingTask.getId()))
.andReturn(Optional.of(waitingTask)).anyTimes();
EasyMock.expect(taskStorage.getTask(pendingTask.getId()))
.andReturn(Optional.of(pendingTask)).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -4237,6 +4265,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
EasyMock.expect(taskClient.getStartTimeAsync(waitingTask.getId()))
.andReturn(Futures.immediateFuture(null));
EasyMock.expect(taskClient.getStartTimeAsync(pendingTask.getId()))
.andReturn(Futures.immediateFuture(null));
EasyMock.expect(taskClient.getStatusAsync(readingTask.getId()))
.andReturn(Futures.immediateFuture(Status.READING));
EasyMock.expect(taskClient.getStatusAsync(publishingTask.getId()))
@ -4245,6 +4279,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
.andReturn(Futures.immediateFuture(Status.PAUSED));
EasyMock.expect(taskClient.getStatusAsync(failsToResumePausedTask.getId()))
.andReturn(Futures.immediateFuture(Status.PAUSED));
EasyMock.expect(taskClient.getStatusAsync(waitingTask.getId()))
.andReturn(Futures.immediateFuture(Status.NOT_STARTED));
EasyMock.expect(taskClient.getStatusAsync(pendingTask.getId()))
.andReturn(Futures.immediateFuture(Status.NOT_STARTED));
EasyMock.expect(taskClient.getEndOffsetsAsync(publishingTask.getId()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 0L)));
@ -4258,6 +4296,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskClient.getCheckpointsAsync(failsToResumePausedTask.getId(), true))
.andReturn(Futures.immediateFuture(new TreeMap<>()));
EasyMock.expect(taskClient.getCheckpointsAsync(waitingTask.getId(), true))
.andReturn(Futures.immediateFuture(null));
EasyMock.expect(taskClient.getCheckpointsAsync(pendingTask.getId(), true))
.andReturn(Futures.immediateFuture(null));
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
// Only the active i.e non-publishing tasks are resumed

View File

@ -3102,6 +3102,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
final Collection workItems = new ArrayList();
workItems.add(new TestTaskRunnerWorkItem(id1, null, new TaskLocation(id1.getId(), 8100, 8100)));
workItems.add(new TestTaskRunnerWorkItem(id2, null, new TaskLocation(id2.getId(), 8100, 8100)));
workItems.add(new TestTaskRunnerWorkItem(id3, null, new TaskLocation(id3.getId(), 8100, 8100)));
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();

View File

@ -2014,9 +2014,24 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
Map<String, ListenableFuture<Boolean>> tasksToResume = new HashMap<>();
if (activelyReadingTaskGroups.isEmpty()) {
return;
}
// Resume only running tasks and not pending / waiting ones.
if (!taskMaster.getTaskRunner().isPresent()) {
return;
}
Set<String> runningTaskIds = taskMaster.getTaskRunner()
.get()
.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet());
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
for (String taskId : taskGroup.tasks.keySet()) {
tasksToResume.put(taskId, taskClient.resumeAsync(taskId));
if (runningTaskIds.contains(taskId)) {
tasksToResume.put(taskId, taskClient.resumeAsync(taskId));
}
}
}

View File

@ -331,6 +331,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();
replayAll();
@ -564,6 +565,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).times(3);
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();
replayAll();