From 7f2b6577ef19f18523e8353336ad496e8dc4a270 Mon Sep 17 00:00:00 2001 From: elloooooo Date: Fri, 27 Sep 2019 07:15:24 +0800 Subject: [PATCH] get active task by datasource when supervisor discover tasks (#8450) * get active task by datasource when supervisor discover tasks * fix ut * fix ut * fix ut * remove unnecessary condition check * fix ut * remove stream in hot loop --- .../kafka/supervisor/KafkaSupervisorTest.java | 110 ++++++++---------- .../supervisor/KinesisSupervisorTest.java | 105 +++++++---------- .../overlord/HeapMemoryTaskStorage.java | 27 ++++- .../overlord/MetadataTaskStorage.java | 21 +++- .../druid/indexing/overlord/TaskStorage.java | 33 ++++-- .../supervisor/SeekableStreamSupervisor.java | 5 +- .../SeekableStreamSupervisorStateTest.java | 18 +-- 7 files changed, 172 insertions(+), 147 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 4dfa4722719..759182feac2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -287,7 +287,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -342,7 +342,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -366,7 +366,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -421,7 +421,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -476,7 +476,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -515,7 +515,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -557,7 +557,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -598,7 +598,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) @@ -635,7 +635,7 @@ public class KafkaSupervisorTest extends EasyMockSupport addSomeEvents(1); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) @@ -653,23 +653,11 @@ public class KafkaSupervisorTest extends EasyMockSupport } @Test - public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception + public void testDontKillTasksWithMismatchedType() throws Exception { supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); - // different datasource (don't kill) - Task id1 = createKafkaIndexTask( - "id1", - "other-datasource", - 2, - new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 10L)), - null, - null, - supervisor.getTuningConfig() - ); - // non KafkaIndexTask (don't kill) Task id2 = new RealtimeIndexTask( "id2", @@ -682,12 +670,12 @@ public class KafkaSupervisorTest extends EasyMockSupport null ); - List existingTasks = ImmutableList.of(id1, id2); + List existingTasks = ImmutableList.of(id2); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) .anyTimes(); @@ -777,7 +765,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -835,7 +823,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) .anyTimes(); @@ -872,7 +860,7 @@ public class KafkaSupervisorTest extends EasyMockSupport // test that running the main loop again checks the status of the tasks that were created and does nothing if they // are all still running EasyMock.reset(taskStorage); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -890,7 +878,7 @@ public class KafkaSupervisorTest extends EasyMockSupport KafkaIndexTask iHaveFailed = (KafkaIndexTask) tasks.get(3); EasyMock.reset(taskStorage); EasyMock.reset(taskQueue); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(imStillAlive).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillAlive).anyTimes(); for (Task task : imStillAlive) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -939,7 +927,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(Status.READING)); @@ -983,7 +971,7 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture(checkpoints)) .times(1); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); EasyMock.expect(taskStorage.getStatus(runningTaskId)) @@ -1026,7 +1014,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) .anyTimes(); @@ -1070,7 +1058,7 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture(checkpoints2)) .times(2); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1092,7 +1080,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.reset(taskStorage); EasyMock.reset(taskQueue); EasyMock.reset(taskClient); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(imStillRunning).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillRunning).anyTimes(); for (Task task : imStillRunning) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1129,7 +1117,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -1151,7 +1139,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue); captured = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1242,7 +1230,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( @@ -1358,7 +1346,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( @@ -1487,7 +1475,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); @@ -1567,7 +1555,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -1596,7 +1584,7 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture(checkpoints2)) .times(2); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1626,7 +1614,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -1660,7 +1648,7 @@ public class KafkaSupervisorTest extends EasyMockSupport .times(2); captured = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1712,7 +1700,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -1746,7 +1734,7 @@ public class KafkaSupervisorTest extends EasyMockSupport .times(2); captured = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1875,7 +1863,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -1934,7 +1922,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -1959,7 +1947,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2015,7 +2003,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2048,7 +2036,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); EasyMock.reset(indexerMetadataStorageCoordinator); @@ -2138,7 +2126,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2236,7 +2224,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2346,7 +2334,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2454,7 +2442,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2541,7 +2529,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2603,7 +2591,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -2684,7 +2672,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2739,7 +2727,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2775,7 +2763,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -2801,7 +2789,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( null @@ -2950,7 +2938,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) @@ -3019,7 +3007,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 4d2513120f7..a9c182bed79 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -226,7 +226,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -294,7 +294,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata(null) ).anyTimes(); @@ -352,7 +352,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -429,7 +429,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -481,7 +481,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -540,7 +540,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>( @@ -593,7 +593,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>( @@ -615,7 +615,7 @@ public class KinesisSupervisorTest extends EasyMockSupport } @Test - public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception + public void testDontKillTasksWithMismatchedType() throws Exception { supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); @@ -634,21 +634,6 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - // different datasource (don't kill) - Task id1 = createKinesisIndexTask( - "id1", - "other-datasource", - 2, - new SeekableStreamStartSequenceNumbers<>( - STREAM, - ImmutableMap.of(SHARD_ID0, "0", SHARD_ID1, "0"), - ImmutableSet.of(SHARD_ID0, SHARD_ID1) - ), - new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1", SHARD_ID1, "12")), - null, - null - ); - // non KinesisIndexTask (don't kill) Task id2 = new RealtimeIndexTask( "id2", @@ -661,12 +646,12 @@ public class KinesisSupervisorTest extends EasyMockSupport null ); - List existingTasks = ImmutableList.of(id1, id2); + List existingTasks = ImmutableList.of(id2); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) .anyTimes(); @@ -760,7 +745,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -832,7 +817,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) .anyTimes(); @@ -878,7 +863,7 @@ public class KinesisSupervisorTest extends EasyMockSupport // test that running the main loop again checks the status of the tasks that were created and does nothing if they // are all still running EasyMock.reset(taskStorage); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -896,7 +881,7 @@ public class KinesisSupervisorTest extends EasyMockSupport KinesisIndexTask iHaveFailed = (KinesisIndexTask) tasks.get(3); EasyMock.reset(taskStorage); EasyMock.reset(taskQueue); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(imStillAlive).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillAlive).anyTimes(); for (Task task : imStillAlive) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -969,7 +954,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync("id1")) @@ -1020,7 +1005,7 @@ public class KinesisSupervisorTest extends EasyMockSupport .times(1); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); EasyMock.expect(taskStorage.getStatus(runningTaskId)) @@ -1080,7 +1065,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) .anyTimes(); @@ -1133,7 +1118,7 @@ public class KinesisSupervisorTest extends EasyMockSupport .times(2); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1155,7 +1140,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.reset(taskStorage); EasyMock.reset(taskQueue); EasyMock.reset(taskClient); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(imStillRunning).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillRunning).anyTimes(); for (Task task : imStillRunning) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1206,7 +1191,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata(null) ).anyTimes(); @@ -1226,7 +1211,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue); final Capture secondTasks = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1370,7 +1355,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( @@ -1527,7 +1512,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( @@ -1696,7 +1681,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); @@ -1825,7 +1810,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -1863,7 +1848,7 @@ public class KinesisSupervisorTest extends EasyMockSupport .times(2); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -1907,7 +1892,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -1944,7 +1929,7 @@ public class KinesisSupervisorTest extends EasyMockSupport .times(2); captured = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -2015,7 +2000,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -2052,7 +2037,7 @@ public class KinesisSupervisorTest extends EasyMockSupport .times(2); captured = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); for (Task task : tasks) { EasyMock.expect(taskStorage.getStatus(task.getId())) .andReturn(Optional.of(TaskStatus.running(task.getId()))) @@ -2237,7 +2222,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2323,7 +2308,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2353,7 +2338,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2432,7 +2417,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2477,7 +2462,7 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -2607,7 +2592,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2761,7 +2746,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expectLastCall().anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2917,7 +2902,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3070,7 +3055,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3162,7 +3147,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)).andReturn(Collections.emptySet()).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3228,7 +3213,7 @@ public class KinesisSupervisorTest extends EasyMockSupport .anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( null @@ -3348,7 +3333,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3426,7 +3411,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -3549,7 +3534,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) @@ -3645,7 +3630,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index fe22f82be19..be43cc1e3e6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -194,6 +194,25 @@ public class HeapMemoryTaskStorage implements TaskStorage } } + @Override + public List getActiveTasksByDatasource(String datasource) + { + giant.lock(); + + try { + final ImmutableList.Builder listBuilder = ImmutableList.builder(); + for (Map.Entry entry : tasks.entrySet()) { + if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) { + listBuilder.add(entry.getValue().getTask()); + } + } + return listBuilder.build(); + } + finally { + giant.unlock(); + } + } + @Override public List> getActiveTaskInfo(@Nullable String dataSource) { @@ -241,7 +260,8 @@ public class HeapMemoryTaskStorage implements TaskStorage return maxTaskStatuses == null ? getRecentlyCreatedAlreadyFinishedTaskInfoSince( - DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), + DateTimes.nowUtc() + .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), createdDateDesc ) : getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); @@ -283,7 +303,10 @@ public class HeapMemoryTaskStorage implements TaskStorage } } - private List> getNRecentlyCreatedAlreadyFinishedTaskInfo(int n, Ordering createdDateDesc) + private List> getNRecentlyCreatedAlreadyFinishedTaskInfo( + int n, + Ordering createdDateDesc + ) { giant.lock(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 186aa18224e..08c1343f80b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -192,10 +192,23 @@ public class MetadataTaskStorage implements TaskStorage // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module // and don't know what to do with the payload, so we won't be able to make use of it anyway return handler.getActiveTaskInfo(null) - .stream() - .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) - .map(TaskInfo::getTask) - .collect(Collectors.toList()); + .stream() + .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) + .map(TaskInfo::getTask) + .collect(Collectors.toList()); + } + + @Override + public List getActiveTasksByDatasource(String datasource) + { + List> activeTaskInfos = handler.getActiveTaskInfo(datasource); + ImmutableList.Builder tasksBuilder = ImmutableList.builder(); + for (TaskInfo taskInfo : activeTaskInfos) { + if (taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) { + tasksBuilder.add(taskInfo.getTask()); + } + } + return tasksBuilder.build(); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index d8a4806bb57..16810aa2529 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -36,8 +36,9 @@ public interface TaskStorage /** * Adds a task to the storage facility with a particular status. * - * @param task task to add + * @param task task to add * @param status task status + * * @throws EntryExistsException if the task ID already exists */ void insert(Task task, TaskStatus status) throws EntryExistsException; @@ -52,7 +53,8 @@ public interface TaskStorage /** * Persists lock state in the storage facility. - * @param taskid task ID + * + * @param taskid task ID * @param taskLock lock state */ void addLock(String taskid, TaskLock taskLock); @@ -70,7 +72,7 @@ public interface TaskStorage * Removes lock state from the storage facility. It is harmless to keep old locks in the storage facility, but * this method can help reclaim wasted space. * - * @param taskid task ID + * @param taskid task ID * @param taskLock lock state */ void removeLock(String taskid, TaskLock taskLock); @@ -85,10 +87,11 @@ public interface TaskStorage /** * Returns task as stored in the storage facility. If the task ID does not exist, this will return an * absentee Optional. - * + *

* NOTE: This method really feels like it should be combined with {@link #getStatus}. Expect that in the future. * * @param taskid task ID + * * @return optional task */ Optional getTask(String taskid); @@ -98,6 +101,7 @@ public interface TaskStorage * an absentee Optional. * * @param taskid task ID + * * @return task status */ Optional getStatus(String taskid); @@ -108,10 +112,9 @@ public interface TaskStorage /** * Add an action taken by a task to the audit log. * - * @param task task to record action for + * @param task task to record action for * @param taskAction task action to record - * - * @param task action return type + * @param task action return type */ @Deprecated void addAuditLog(Task task, TaskAction taskAction); @@ -120,6 +123,7 @@ public interface TaskStorage * Returns all actions taken by a task. * * @param taskid task ID + * * @return list of task actions */ @Deprecated @@ -133,6 +137,16 @@ public interface TaskStorage */ List getActiveTasks(); + /** + * Returns a list of currently running or pending tasks as stored in the storage facility. No particular order + * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. + * + * @param datasource datasource + * + * @return list of {@link Task} + */ + List getActiveTasksByDatasource(String datasource); + /** * Returns a list of currently running or pending tasks as stored in the storage facility as {@link TaskInfo}. No * particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. @@ -147,9 +161,9 @@ public interface TaskStorage * of creation. No particular standard of "recent" is guaranteed, and in fact, this method is permitted to simply * return nothing. * - * @param maxTaskStatuses maxTaskStatuses + * @param maxTaskStatuses maxTaskStatuses * @param durationBeforeNow duration - * @param datasource datasource + * @param datasource datasource * * @return list of {@link TaskInfo} */ @@ -163,6 +177,7 @@ public interface TaskStorage * Returns a list of locks for a particular task. * * @param taskid task ID + * * @return list of TaskLocks for the given task */ List getLocks(String taskid); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0ebff42b675..eef5356d985 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1293,11 +1293,12 @@ public abstract class SeekableStreamSupervisor futureTaskIds = new ArrayList<>(); List> futures = new ArrayList<>(); - List tasks = taskStorage.getActiveTasks(); + List tasks = taskStorage.getActiveTasksByDatasource(dataSource); + final Map taskGroupsToVerify = new HashMap<>(); for (Task task : tasks) { - if (!doesTaskTypeMatchSupervisor(task) || !dataSource.equals(task.getDataSource())) { + if (!doesTaskTypeMatchSupervisor(task)) { continue; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 9d467608747..324932a2df8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -158,7 +158,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); replayAll(); @@ -199,7 +199,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport EasyMock.expect(recordSupplier.getPartitionIds(STREAM)) .andThrow(new StreamException(new IllegalStateException(EXCEPTION_MSG))) .anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); replayAll(); @@ -259,7 +259,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport .andThrow(new StreamException(new IllegalStateException())) .times(3); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); replayAll(); @@ -325,9 +325,9 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).times(3); - EasyMock.expect(taskStorage.getActiveTasks()).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).times(3); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); replayAll(); @@ -401,7 +401,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).times(3); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3); @@ -478,7 +478,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport { EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); replayAll(); @@ -517,7 +517,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); taskRunner.unregisterListener("testSupervisorId");