diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 3042d9199f3..d437427a959 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1999,14 +1999,18 @@ public abstract class SeekableStreamSupervisor pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId); - TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); - if (taskGroupToCheck == null) { + if (pendingCompletionTasksForGroup == null) { // This function is called by the SegmentTransactionAppendAction. // This is only triggered after a task has already started publishing so this shouldn't really happen. // It's okay to just let the task try publishing in this case. log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); return true; } + TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); + if (taskGroupToCheck == null) { + log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); + return true; + } for (TaskGroup taskGroup : pendingCompletionTasksForGroup) { if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b9..7d8ad815cfc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1602,6 +1602,29 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport verifyAll(); } + @Test + public void testSupervisorCanPublish() + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + replayAll(); + + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + Map startingPartitions = new HashMap<>(); + startingPartitions.put("partition", 0); + Map checkpointPartitions = new HashMap<>(); + checkpointPartitions.put("partition", 10); + Assert.assertTrue(supervisor.canPublishSegments(0, "unknown_task")); + supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_1", startingPartitions); + Assert.assertTrue(supervisor.canPublishSegments(0, "task_1")); + supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_2", checkpointPartitions); + Assert.assertTrue(supervisor.canPublishSegments(0, "task_1")); + Assert.assertFalse(supervisor.canPublishSegments(0, "task_2")); + } + @Test public void testEmitBothLag() throws Exception {