add tests

This commit is contained in:
George Wu 2024-11-26 13:04:08 -05:00
parent 1c05c2f5f5
commit b6fc5793cd
2 changed files with 29 additions and 2 deletions

View File

@ -1999,14 +1999,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public boolean canPublishSegments(Integer taskGroupId, String taskId) public boolean canPublishSegments(Integer taskGroupId, String taskId)
{ {
CopyOnWriteArrayList<TaskGroup> pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId); CopyOnWriteArrayList<TaskGroup> pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId);
TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); if (pendingCompletionTasksForGroup == null) {
if (taskGroupToCheck == null) {
// This function is called by the SegmentTransactionAppendAction. // This function is called by the SegmentTransactionAppendAction.
// This is only triggered after a task has already started publishing so this shouldn't really happen. // This is only triggered after a task has already started publishing so this shouldn't really happen.
// It's okay to just let the task try publishing in this case. // It's okay to just let the task try publishing in this case.
log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); log.warn("Did not find task group [%s] to check for publishing.", taskGroupId);
return true; return true;
} }
TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null);
if (taskGroupToCheck == null) {
log.warn("Did not find task group [%s] to check for publishing.", taskGroupId);
return true;
}
for (TaskGroup taskGroup : pendingCompletionTasksForGroup) { for (TaskGroup taskGroup : pendingCompletionTasksForGroup) {
if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) { if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) {

View File

@ -1602,6 +1602,29 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
verifyAll(); verifyAll();
} }
@Test
public void testSupervisorCanPublish()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
Map<String, Integer> startingPartitions = new HashMap<>();
startingPartitions.put("partition", 0);
Map<String, Integer> checkpointPartitions = new HashMap<>();
checkpointPartitions.put("partition", 10);
Assert.assertTrue(supervisor.canPublishSegments(0, "unknown_task"));
supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_1", startingPartitions);
Assert.assertTrue(supervisor.canPublishSegments(0, "task_1"));
supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_2", checkpointPartitions);
Assert.assertTrue(supervisor.canPublishSegments(0, "task_1"));
Assert.assertFalse(supervisor.canPublishSegments(0, "task_2"));
}
@Test @Test
public void testEmitBothLag() throws Exception public void testEmitBothLag() throws Exception
{ {