diff --git a/api/src/main/java/io/druid/indexer/TaskStatusPlus.java b/api/src/main/java/io/druid/indexer/TaskStatusPlus.java index 8e236f8b149..d8d93d6055d 100644 --- a/api/src/main/java/io/druid/indexer/TaskStatusPlus.java +++ b/api/src/main/java/io/druid/indexer/TaskStatusPlus.java @@ -36,6 +36,7 @@ public class TaskStatusPlus private final TaskState state; private final Long duration; private final TaskLocation location; + private final String dataSource; @JsonCreator public TaskStatusPlus( @@ -45,7 +46,8 @@ public class TaskStatusPlus @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @JsonProperty("statusCode") @Nullable TaskState state, @JsonProperty("duration") @Nullable Long duration, - @JsonProperty("location") TaskLocation location + @JsonProperty("location") TaskLocation location, + @JsonProperty("dataSource") String dataSource ) { if (state != null && state.isComplete()) { @@ -58,6 +60,7 @@ public class TaskStatusPlus this.state = state; this.duration = duration; this.location = Preconditions.checkNotNull(location, "location"); + this.dataSource = dataSource; } @JsonProperty @@ -143,4 +146,11 @@ public class TaskStatusPlus { return Objects.hash(id, type, createdTime, queueInsertionTime, state, duration, location); } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + } diff --git a/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java b/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java index 49dae986792..200a6b4c701 100644 --- a/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java +++ b/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java @@ -52,7 +52,8 @@ public class TaskStatusPlusTest DateTimes.nowUtc(), TaskState.RUNNING, 1000L, - TaskLocation.create("testHost", 1010, -1) + TaskLocation.create("testHost", 1010, -1), + "ds_test" ); final String json = mapper.writeValueAsString(status); Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class)); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 8364153b181..6513f65e39b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2060,12 +2060,14 @@ public class KafkaSupervisorTest extends EasyMockSupport { private final String taskType; private final TaskLocation location; + private final String dataSource; public TestTaskRunnerWorkItem(Task task, ListenableFuture result, TaskLocation location) { super(task.getId(), result); this.taskType = task.getType(); this.location = location; + this.dataSource = task.getDataSource(); } @Override @@ -2079,6 +2081,13 @@ public class KafkaSupervisorTest extends EasyMockSupport { return taskType; } + + @Override + public String getDataSource() + { + return dataSource; + } + } private static class TestableKafkaSupervisor extends KafkaSupervisor diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 05faf550a95..0440bf909d3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -772,6 +772,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer { return task.getType(); } + + @Override + public String getDataSource() + { + return task.getDataSource(); + } } private static class ProcessHolder diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index e9e623cc801..7745812e4e9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -628,7 +628,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer task.getId(), task.getType(), null, - null + null, + task.getDataSource() ); pendingTaskPayloads.put(task.getId(), task); pendingTasks.put(task.getId(), taskRunnerWorkItem); @@ -966,7 +967,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer taskId, announcement.getTaskType(), zkWorker.getWorker(), - TaskLocation.unknown() + TaskLocation.unknown(), + runningTasks.get(taskId).getDataSource() ); final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( taskId, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java index a62990f1678..c4958084c95 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java @@ -31,6 +31,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem { private final SettableFuture result; private String taskType; + private final String dataSource; private Worker worker; private TaskLocation location; @@ -38,10 +39,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem String taskId, String taskType, Worker worker, - TaskLocation location + TaskLocation location, + String dataSource ) { - this(taskId, taskType, SettableFuture.create(), worker, location); + this(taskId, taskType, SettableFuture.create(), worker, location, dataSource); } private RemoteTaskRunnerWorkItem( @@ -49,7 +51,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem String taskType, SettableFuture result, Worker worker, - TaskLocation location + TaskLocation location, + String dataSource ) { super(taskId, result); @@ -57,6 +60,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem this.taskType = taskType; this.worker = worker; this.location = location == null ? TaskLocation.unknown() : location; + this.dataSource = dataSource; } private RemoteTaskRunnerWorkItem( @@ -66,7 +70,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem DateTime createdTime, DateTime queueInsertionTime, Worker worker, - TaskLocation location + TaskLocation location, + String dataSource ) { super(taskId, result, createdTime, queueInsertionTime); @@ -74,6 +79,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem this.taskType = taskType; this.worker = worker; this.location = location == null ? TaskLocation.unknown() : location; + this.dataSource = dataSource; } public void setLocation(TaskLocation location) @@ -97,6 +103,12 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem { return taskType; } + + @Override + public String getDataSource() + { + return dataSource; + } public void setWorker(Worker worker) { @@ -115,7 +127,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) { - return new RemoteTaskRunnerWorkItem(getTaskId(), taskType, result, getCreatedTime(), time, worker, location); + return new RemoteTaskRunnerWorkItem(getTaskId(), taskType, result, getCreatedTime(), time, worker, location, dataSource); } public RemoteTaskRunnerWorkItem withWorker(Worker theWorker, TaskLocation location) @@ -127,7 +139,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem getCreatedTime(), getQueueInsertionTime(), theWorker, - location + location, + dataSource ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index f2bbc4d21bb..9d0ffe22107 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -93,6 +93,7 @@ public abstract class TaskRunnerWorkItem */ @Nullable public abstract String getTaskType(); + public abstract String getDataSource(); @Override public String toString() diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java index 469a5e7bff0..cac53e7d0d3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -97,4 +97,10 @@ public class TaskStorageQueryAdapter } return segments; } + + public Pair getCreatedDateAndDataSource(String taskId) + { + return storage.getCreatedDateTimeAndDataSource(taskId); + } + } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 48eeb7f73ff..06e6342356b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -417,6 +417,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker { return task.getType(); } + + @Override + public String getDataSource() + { + return task.getDataSource(); + } } private class ThreadPoolTaskRunnerCallable implements Callable diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 14b7dacbdc1..b28eaa0ea12 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1343,7 +1343,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer State state ) { - super(taskId, taskType, worker, location); + super(taskId, task == null ? null : task.getType(), worker, location, task == null ? null : task.getDataSource()); this.state = Preconditions.checkNotNull(state); Preconditions.checkArgument(task == null || taskType == null || taskType.equals(task.getType())); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 21dcaa15aac..8be1dcdbe95 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -55,6 +55,7 @@ import io.druid.indexing.overlord.http.security.TaskResourceFilter; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.metadata.EntryExistsException; @@ -70,6 +71,7 @@ import io.druid.server.security.ResourceAction; import io.druid.server.security.ResourceType; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -460,7 +462,8 @@ public class OverlordResource new WaitingTask( task.getId(), task.getType(), - SettableFuture.create() + SettableFuture.create(), + task.getDataSource() ) { @Override @@ -481,15 +484,18 @@ public class OverlordResource private static class WaitingTask extends TaskRunnerWorkItem { private final String taskType; + private final String dataSource; WaitingTask( String taskId, String taskType, - ListenableFuture result + ListenableFuture result, + String dataSource ) { super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH); this.taskType = taskType; + this.dataSource = dataSource; } @Override @@ -503,6 +509,12 @@ public class OverlordResource { return taskType; } + + @Override + public String getDataSource() + { + return dataSource; + } } @GET @@ -595,20 +607,22 @@ public class OverlordResource ) ); - final List completeTasks = recentlyFinishedTasks - .stream() - .map(status -> new TaskStatusPlus( - status.getId(), - taskFunction.apply(status.getId()).getType(), - taskStorageQueryAdapter.getCreatedTime(status.getId()), - // Would be nice to include the real queue insertion time, but the TaskStorage API doesn't yet allow it. - DateTimes.EPOCH, - status.getStatusCode(), - status.getDuration(), - TaskLocation.unknown() - ) - ) - .collect(Collectors.toList()); + final List completeTasks = Lists.newArrayList(Iterables.transform( + recentlyFinishedTasks, + status -> { + final Pair pair = taskStorageQueryAdapter.getCreatedDateAndDataSource(status.getId()); + return new TaskStatusPlus( + status.getId(), + taskFunction.apply(status.getId()).getType(), + pair.lhs, + // Would be nice to include the real queue insertion time, but the + // TaskStorage API doesn't yet allow it. + DateTimes.EPOCH, + status.getStatusCode(), + status.getDuration(), + TaskLocation.unknown(), + pair.rhs); + })); return Response.ok(completeTasks).build(); } @@ -718,6 +732,26 @@ public class OverlordResource } } + @GET + @Path("/dataSources/{dataSource}") + @Produces(MediaType.APPLICATION_JSON) + public Response getRunningTasksByDataSource(@PathParam("dataSource") String dataSource, + @Context HttpServletRequest request) + { + Optional ts = taskMaster.getTaskRunner(); + if (!ts.isPresent()) { + return Response.status(Response.Status.NOT_FOUND).entity("No tasks are running").build(); + } + Collection runningTasks = ts.get().getRunningTasks(); + if (runningTasks == null || runningTasks.isEmpty()) { + return Response.status(Response.Status.NOT_FOUND) + .entity("No running tasks found for the datasource : " + dataSource).build(); + } + List taskRunnerWorkItemList = runningTasks.stream() + .filter(task -> dataSource.equals(task.getDataSource())).collect(Collectors.toList()); + return Response.ok(taskRunnerWorkItemList).build(); + } + private Response workItemsResponse(final Function> fn) { return asLeaderWith( @@ -742,7 +776,8 @@ public class OverlordResource workItem.getQueueInsertionTime(), null, null, - workItem.getLocation() + workItem.getLocation(), + workItem.getDataSource() ); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index a681c2b8898..816403de645 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -43,7 +43,6 @@ import io.druid.indexing.worker.Worker; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.StringUtils; import org.apache.curator.framework.CuratorFramework; -import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -321,22 +320,41 @@ public class RemoteTaskRunnerTest @Test public void testBootstrap() throws Exception { - cf.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL) - .forPath(joiner.join(statusPath, "first"), jsonMapper.writeValueAsBytes(TaskStatus.running("first"))); - cf.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL) - .forPath(joiner.join(statusPath, "second"), jsonMapper.writeValueAsBytes(TaskStatus.running("second"))); + Period timeoutPeriod = Period.millis(1000); + makeWorker(); - doSetup(); + RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod); + rtrConfig.setMaxPercentageBlacklistWorkers(100); - final Set existingTasks = Sets.newHashSet(); - for (ImmutableWorkerInfo workerInfo : remoteTaskRunner.getWorkers()) { - existingTasks.addAll(workerInfo.getRunningTasks()); - } - Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks); + makeRemoteTaskRunner(rtrConfig); + + TestRealtimeTask task1 = new TestRealtimeTask( + "first", + new TaskResource("first", 1), + "foo", + TaskStatus.running("first"), + jsonMapper); + remoteTaskRunner.run(task1); + Assert.assertTrue(taskAnnounced(task1.getId())); + mockWorkerRunningTask(task1); + + TestRealtimeTask task = new TestRealtimeTask( + "second", + new TaskResource("task", 2), + "foo", + TaskStatus.running("task"), + jsonMapper); + remoteTaskRunner.run(task); + + TestRealtimeTask task2 = new TestRealtimeTask( + "second", + new TaskResource("second", 2), + "foo", + TaskStatus.running("second"), + jsonMapper); + remoteTaskRunner.run(task2); + Assert.assertTrue(taskAnnounced(task2.getId())); + mockWorkerRunningTask(task2); final Set runningTasks = Sets.newHashSet( Iterables.transform( @@ -357,18 +375,19 @@ public class RemoteTaskRunnerTest @Test public void testRunWithTaskComplete() throws Exception { - cf.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL) - .forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(TaskStatus.success(task.getId()))); - doSetup(); + TestRealtimeTask task1 = new TestRealtimeTask( + "testTask", + new TaskResource("testTask", 2), + "foo", + TaskStatus.success("testTask"), + jsonMapper); + remoteTaskRunner.run(task1); + Assert.assertTrue(taskAnnounced(task1.getId())); + mockWorkerRunningTask(task1); + mockWorkerCompleteSuccessfulTask(task1); - ListenableFuture future = remoteTaskRunner.run(task); - - TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - - Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, remoteTaskRunner.run(task1).get().getStatusCode()); } @Test @@ -621,11 +640,11 @@ public class RemoteTaskRunnerTest @Test public void testSortByInsertionTime() throws Exception { - RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", "t", null, null) + RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", "t", null, null, "ds_test") .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:03Z")); - RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", "t", null, null) + RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", "t", null, null, "ds_test") .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:02Z")); - RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", "t", null, null) + RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", "t", null, null, "ds_test") .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:01Z")); ArrayList workItems = Lists.newArrayList(item1, item2, item3); RemoteTaskRunner.sortByInsertionTime(workItems); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index 31dc26c07c9..1d24d1ba2ed 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -339,7 +339,8 @@ public class PendingTaskBasedProvisioningStrategyTest testTask.getId(), testTask.getType(), null, - TaskLocation.unknown() + TaskLocation.unknown(), + testTask.getDataSource() ).withQueueInsertionTime(DateTimes.nowUtc()) ) ).times(2); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java index d3989507e87..5b9179cf2db 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java @@ -125,7 +125,7 @@ public class SimpleProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) + new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource()) .withQueueInsertionTime(DateTimes.nowUtc()) ) ); @@ -163,7 +163,7 @@ public class SimpleProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) + new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource()) .withQueueInsertionTime(DateTimes.nowUtc()) ) ).times(2); @@ -222,7 +222,7 @@ public class SimpleProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) + new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource()) .withQueueInsertionTime(DateTimes.nowUtc()) ) ).times(2); @@ -275,7 +275,7 @@ public class SimpleProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) + new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource()) .withQueueInsertionTime(DateTimes.nowUtc()) ) ).times(2); @@ -316,7 +316,7 @@ public class SimpleProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) + new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource()) .withQueueInsertionTime(DateTimes.nowUtc()) ) ).times(2); @@ -364,7 +364,7 @@ public class SimpleProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) + new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource()) .withQueueInsertionTime(DateTimes.nowUtc()) ) ).times(2); @@ -468,7 +468,7 @@ public class SimpleProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) + new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource()) .withQueueInsertionTime(DateTimes.nowUtc()) ) ).times(2); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 3fc45c2f6b9..3e6862d502c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -40,6 +40,7 @@ import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.java.util.common.DateTimes; import io.druid.segment.TestHelper; +import io.druid.java.util.common.Pair; import io.druid.server.security.Access; import io.druid.server.security.Action; import io.druid.server.security.AuthConfig; @@ -49,7 +50,9 @@ import io.druid.server.security.AuthorizerMapper; import io.druid.server.security.ForbiddenException; import io.druid.server.security.Resource; import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -200,7 +203,32 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(null)).andReturn( + EasyMock.> expect(taskRunner.getRunningTasks()).andReturn( + ImmutableList.of( + new MockTaskRunnerWorkItem(tasksIds.get(0), null), + new MockTaskRunnerWorkItem(tasksIds.get(1), null), + new MockTaskRunnerWorkItem(tasksIds.get(2), null))); + + EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andStubReturn( + Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))); + EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andStubReturn( + Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))); + EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(2))).andStubReturn( + Optional.of(getTaskWithIdAndDatasource(tasksIds.get(2), "allow"))); + EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(tasksIds.get(0))) + .andStubReturn(DateTimes.EPOCH); + EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(tasksIds.get(1))) + .andStubReturn(DateTimes.EPOCH); + EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(tasksIds.get(2))) + .andStubReturn(DateTimes.EPOCH); + EasyMock.expect(taskStorageQueryAdapter.getCreatedDateAndDataSource(tasksIds.get(0))) + .andStubReturn(new Pair<>(DateTime.now(ISOChronology.getInstanceUTC()), "ds_test")); + EasyMock.expect(taskStorageQueryAdapter.getCreatedDateAndDataSource(tasksIds.get(1))) + .andStubReturn(new Pair<>(DateTime.now(ISOChronology.getInstanceUTC()), "ds_test")); + EasyMock.expect(taskStorageQueryAdapter.getCreatedDateAndDataSource(tasksIds.get(2))) + .andStubReturn(new Pair<>(DateTime.now(ISOChronology.getInstanceUTC()), "ds_test")); + + EasyMock.expect(taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(null)).andStubReturn( Lists.transform( tasksIds, new Function() @@ -210,36 +238,14 @@ public class OverlordResourceTest { return TaskStatus.success(input); } - } - ) - ).once(); + })); - EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(0)))).andReturn( - Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny")) - ).once(); - EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(1)))).andReturn( - Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow")) - ).once(); - EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(2)))).andReturn( - Optional.of(getTaskWithIdAndDatasource(tasksIds.get(2), "allow")) - ).once(); - EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(1)))).andReturn( - Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow")) - ).once(); - EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.anyString())) - .andReturn(DateTimes.EPOCH) - .once(); - EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(2)))).andReturn( - Optional.of(getTaskWithIdAndDatasource(tasksIds.get(2), "allow")) - ).once(); - EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.anyString())) - .andReturn(DateTimes.EPOCH) - .once(); EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); - - List responseObjects = (List) overlordResource.getCompleteTasks(null, req) - .getEntity(); - + Assert.assertTrue(taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(null).size() == 3); + Assert.assertTrue(taskRunner.getRunningTasks().size() == 3); + List responseObjects = (List) overlordResource + .getCompleteTasks(null, req).getEntity(); + Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId()); Assert.assertEquals(tasksIds.get(2), responseObjects.get(1).getId()); @@ -361,6 +367,54 @@ public class OverlordResourceTest ); Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2); } + + @Test + public void testGetRunningTasksByDataSource() + { + + List tasksIds = ImmutableList.of("id_1", "id_2"); + EasyMock.> expect(taskRunner.getRunningTasks()).andReturn( + ImmutableList.of( + new MockTaskRunnerWorkItem(tasksIds.get(0), null), + new MockTaskRunnerWorkItem(tasksIds.get(1), null))); + EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andReturn( + Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))).once(); + EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andReturn( + Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))).once(); + + EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); + List responseObjects = (List) overlordResource.getRunningTasksByDataSource("ds_test", req) + .getEntity(); + + Assert.assertEquals(2, responseObjects.size()); + Assert.assertEquals(taskStorageQueryAdapter.getTask("id_1").get().getId(), responseObjects.get(0).getTaskId()); + Assert.assertEquals(taskStorageQueryAdapter.getTask("id_2").get().getId(), responseObjects.get(1).getTaskId()); + Assert.assertTrue("DataSource Check", "ds_test".equals(responseObjects.get(0).getDataSource())); + } + + @Test + public void testGetRunningTasksByDataSourceNeg() + { + expectAuthorizationTokenCheck(); + + List tasksIds = ImmutableList.of("id_1", "id_2"); + EasyMock.> expect(taskRunner.getRunningTasks()).andReturn( + ImmutableList.of( + new MockTaskRunnerWorkItem(tasksIds.get(0), null), + new MockTaskRunnerWorkItem(tasksIds.get(1), null))); + EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andReturn( + Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))).once(); + EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andReturn( + Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))).once(); + + EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); + Assert.assertTrue(taskStorageQueryAdapter.getTask("id_1").isPresent()); + Assert.assertTrue(taskStorageQueryAdapter.getTask("id_2").isPresent()); + List responseObjects = (List) overlordResource.getRunningTasksByDataSource("ds_NA", req) + .getEntity(); + + Assert.assertEquals(0, responseObjects.size()); + } @After public void tearDown() @@ -413,6 +467,12 @@ public class OverlordResourceTest { return "test"; } + + @Override + public String getDataSource() + { + return "ds_test"; + } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index b73c59bc884..d1f404ba3d5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -394,6 +394,12 @@ public class OverlordTest { return task.getType(); } + + @Override + public String getDataSource() + { + return task.getDataSource(); + } }; taskRunnerWorkItems.put(taskId, taskRunnerWorkItem); return future;