As part of this feature, implemented a new endpoint to get running tasks by datasources (#5260)

and added datasource information as part of existing endpoint /druid/indexer/v1/runningTasks.

Added junit test cases for the newly implemented API and fixed existing junit test cases.

Fixed review comments - added new method getCreatedDateTimeAndDataSource into TaskStorageQueryAdapter class
and formatted changed files
This commit is contained in:
Niraja Mishra 2018-03-13 12:18:11 +05:30 committed by Gian Merlino
parent e968811583
commit 96cebfc222
16 changed files with 268 additions and 93 deletions

View File

@ -36,6 +36,7 @@ public class TaskStatusPlus
private final TaskState state; private final TaskState state;
private final Long duration; private final Long duration;
private final TaskLocation location; private final TaskLocation location;
private final String dataSource;
@JsonCreator @JsonCreator
public TaskStatusPlus( public TaskStatusPlus(
@ -45,7 +46,8 @@ public class TaskStatusPlus
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("statusCode") @Nullable TaskState state, @JsonProperty("statusCode") @Nullable TaskState state,
@JsonProperty("duration") @Nullable Long duration, @JsonProperty("duration") @Nullable Long duration,
@JsonProperty("location") TaskLocation location @JsonProperty("location") TaskLocation location,
@JsonProperty("dataSource") String dataSource
) )
{ {
if (state != null && state.isComplete()) { if (state != null && state.isComplete()) {
@ -58,6 +60,7 @@ public class TaskStatusPlus
this.state = state; this.state = state;
this.duration = duration; this.duration = duration;
this.location = Preconditions.checkNotNull(location, "location"); this.location = Preconditions.checkNotNull(location, "location");
this.dataSource = dataSource;
} }
@JsonProperty @JsonProperty
@ -143,4 +146,11 @@ public class TaskStatusPlus
{ {
return Objects.hash(id, type, createdTime, queueInsertionTime, state, duration, location); return Objects.hash(id, type, createdTime, queueInsertionTime, state, duration, location);
} }
@JsonProperty
public String getDataSource()
{
return dataSource;
}
} }

View File

@ -52,7 +52,8 @@ public class TaskStatusPlusTest
DateTimes.nowUtc(), DateTimes.nowUtc(),
TaskState.RUNNING, TaskState.RUNNING,
1000L, 1000L,
TaskLocation.create("testHost", 1010, -1) TaskLocation.create("testHost", 1010, -1),
"ds_test"
); );
final String json = mapper.writeValueAsString(status); final String json = mapper.writeValueAsString(status);
Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class)); Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class));

View File

@ -2060,12 +2060,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
{ {
private final String taskType; private final String taskType;
private final TaskLocation location; private final TaskLocation location;
private final String dataSource;
public TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location) public TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location)
{ {
super(task.getId(), result); super(task.getId(), result);
this.taskType = task.getType(); this.taskType = task.getType();
this.location = location; this.location = location;
this.dataSource = task.getDataSource();
} }
@Override @Override
@ -2079,6 +2081,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
{ {
return taskType; return taskType;
} }
@Override
public String getDataSource()
{
return dataSource;
}
} }
private static class TestableKafkaSupervisor extends KafkaSupervisor private static class TestableKafkaSupervisor extends KafkaSupervisor

View File

@ -772,6 +772,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
{ {
return task.getType(); return task.getType();
} }
@Override
public String getDataSource()
{
return task.getDataSource();
}
} }
private static class ProcessHolder private static class ProcessHolder

View File

@ -628,7 +628,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
task.getId(), task.getId(),
task.getType(), task.getType(),
null, null,
null null,
task.getDataSource()
); );
pendingTaskPayloads.put(task.getId(), task); pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem); pendingTasks.put(task.getId(), taskRunnerWorkItem);
@ -966,7 +967,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
taskId, taskId,
announcement.getTaskType(), announcement.getTaskType(),
zkWorker.getWorker(), zkWorker.getWorker(),
TaskLocation.unknown() TaskLocation.unknown(),
runningTasks.get(taskId).getDataSource()
); );
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId, taskId,

View File

@ -31,6 +31,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{ {
private final SettableFuture<TaskStatus> result; private final SettableFuture<TaskStatus> result;
private String taskType; private String taskType;
private final String dataSource;
private Worker worker; private Worker worker;
private TaskLocation location; private TaskLocation location;
@ -38,10 +39,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
String taskId, String taskId,
String taskType, String taskType,
Worker worker, Worker worker,
TaskLocation location TaskLocation location,
String dataSource
) )
{ {
this(taskId, taskType, SettableFuture.<TaskStatus>create(), worker, location); this(taskId, taskType, SettableFuture.<TaskStatus>create(), worker, location, dataSource);
} }
private RemoteTaskRunnerWorkItem( private RemoteTaskRunnerWorkItem(
@ -49,7 +51,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
String taskType, String taskType,
SettableFuture<TaskStatus> result, SettableFuture<TaskStatus> result,
Worker worker, Worker worker,
TaskLocation location TaskLocation location,
String dataSource
) )
{ {
super(taskId, result); super(taskId, result);
@ -57,6 +60,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
this.taskType = taskType; this.taskType = taskType;
this.worker = worker; this.worker = worker;
this.location = location == null ? TaskLocation.unknown() : location; this.location = location == null ? TaskLocation.unknown() : location;
this.dataSource = dataSource;
} }
private RemoteTaskRunnerWorkItem( private RemoteTaskRunnerWorkItem(
@ -66,7 +70,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
DateTime createdTime, DateTime createdTime,
DateTime queueInsertionTime, DateTime queueInsertionTime,
Worker worker, Worker worker,
TaskLocation location TaskLocation location,
String dataSource
) )
{ {
super(taskId, result, createdTime, queueInsertionTime); super(taskId, result, createdTime, queueInsertionTime);
@ -74,6 +79,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
this.taskType = taskType; this.taskType = taskType;
this.worker = worker; this.worker = worker;
this.location = location == null ? TaskLocation.unknown() : location; this.location = location == null ? TaskLocation.unknown() : location;
this.dataSource = dataSource;
} }
public void setLocation(TaskLocation location) public void setLocation(TaskLocation location)
@ -97,6 +103,12 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{ {
return taskType; return taskType;
} }
@Override
public String getDataSource()
{
return dataSource;
}
public void setWorker(Worker worker) public void setWorker(Worker worker)
{ {
@ -115,7 +127,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{ {
return new RemoteTaskRunnerWorkItem(getTaskId(), taskType, result, getCreatedTime(), time, worker, location); return new RemoteTaskRunnerWorkItem(getTaskId(), taskType, result, getCreatedTime(), time, worker, location, dataSource);
} }
public RemoteTaskRunnerWorkItem withWorker(Worker theWorker, TaskLocation location) public RemoteTaskRunnerWorkItem withWorker(Worker theWorker, TaskLocation location)
@ -127,7 +139,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
getCreatedTime(), getCreatedTime(),
getQueueInsertionTime(), getQueueInsertionTime(),
theWorker, theWorker,
location location,
dataSource
); );
} }
} }

View File

@ -93,6 +93,7 @@ public abstract class TaskRunnerWorkItem
*/ */
@Nullable @Nullable
public abstract String getTaskType(); public abstract String getTaskType();
public abstract String getDataSource();
@Override @Override
public String toString() public String toString()

View File

@ -97,4 +97,10 @@ public class TaskStorageQueryAdapter
} }
return segments; return segments;
} }
public Pair<DateTime, String> getCreatedDateAndDataSource(String taskId)
{
return storage.getCreatedDateTimeAndDataSource(taskId);
}
} }

View File

@ -417,6 +417,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{ {
return task.getType(); return task.getType();
} }
@Override
public String getDataSource()
{
return task.getDataSource();
}
} }
private class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus> private class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus>

View File

@ -1343,7 +1343,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
State state State state
) )
{ {
super(taskId, taskType, worker, location); super(taskId, task == null ? null : task.getType(), worker, location, task == null ? null : task.getDataSource());
this.state = Preconditions.checkNotNull(state); this.state = Preconditions.checkNotNull(state);
Preconditions.checkArgument(task == null || taskType == null || taskType.equals(task.getType())); Preconditions.checkArgument(task == null || taskType == null || taskType.equals(task.getType()));

View File

@ -55,6 +55,7 @@ import io.druid.indexing.overlord.http.security.TaskResourceFilter;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.EntryExistsException; import io.druid.metadata.EntryExistsException;
@ -70,6 +71,7 @@ import io.druid.server.security.ResourceAction;
import io.druid.server.security.ResourceType; import io.druid.server.security.ResourceType;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -460,7 +462,8 @@ public class OverlordResource
new WaitingTask( new WaitingTask(
task.getId(), task.getId(),
task.getType(), task.getType(),
SettableFuture.create() SettableFuture.create(),
task.getDataSource()
) )
{ {
@Override @Override
@ -481,15 +484,18 @@ public class OverlordResource
private static class WaitingTask extends TaskRunnerWorkItem private static class WaitingTask extends TaskRunnerWorkItem
{ {
private final String taskType; private final String taskType;
private final String dataSource;
WaitingTask( WaitingTask(
String taskId, String taskId,
String taskType, String taskType,
ListenableFuture<TaskStatus> result ListenableFuture<TaskStatus> result,
String dataSource
) )
{ {
super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH); super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
this.taskType = taskType; this.taskType = taskType;
this.dataSource = dataSource;
} }
@Override @Override
@ -503,6 +509,12 @@ public class OverlordResource
{ {
return taskType; return taskType;
} }
@Override
public String getDataSource()
{
return dataSource;
}
} }
@GET @GET
@ -595,20 +607,22 @@ public class OverlordResource
) )
); );
final List<TaskStatusPlus> completeTasks = recentlyFinishedTasks final List<TaskStatusPlus> completeTasks = Lists.newArrayList(Iterables.transform(
.stream() recentlyFinishedTasks,
.map(status -> new TaskStatusPlus( status -> {
status.getId(), final Pair<DateTime, String> pair = taskStorageQueryAdapter.getCreatedDateAndDataSource(status.getId());
taskFunction.apply(status.getId()).getType(), return new TaskStatusPlus(
taskStorageQueryAdapter.getCreatedTime(status.getId()), status.getId(),
// Would be nice to include the real queue insertion time, but the TaskStorage API doesn't yet allow it. taskFunction.apply(status.getId()).getType(),
DateTimes.EPOCH, pair.lhs,
status.getStatusCode(), // Would be nice to include the real queue insertion time, but the
status.getDuration(), // TaskStorage API doesn't yet allow it.
TaskLocation.unknown() DateTimes.EPOCH,
) status.getStatusCode(),
) status.getDuration(),
.collect(Collectors.toList()); TaskLocation.unknown(),
pair.rhs);
}));
return Response.ok(completeTasks).build(); return Response.ok(completeTasks).build();
} }
@ -718,6 +732,26 @@ public class OverlordResource
} }
} }
@GET
@Path("/dataSources/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasksByDataSource(@PathParam("dataSource") String dataSource,
@Context HttpServletRequest request)
{
Optional<TaskRunner> ts = taskMaster.getTaskRunner();
if (!ts.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).entity("No tasks are running").build();
}
Collection<? extends TaskRunnerWorkItem> runningTasks = ts.get().getRunningTasks();
if (runningTasks == null || runningTasks.isEmpty()) {
return Response.status(Response.Status.NOT_FOUND)
.entity("No running tasks found for the datasource : " + dataSource).build();
}
List<TaskRunnerWorkItem> taskRunnerWorkItemList = runningTasks.stream()
.filter(task -> dataSource.equals(task.getDataSource())).collect(Collectors.toList());
return Response.ok(taskRunnerWorkItemList).build();
}
private Response workItemsResponse(final Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>> fn) private Response workItemsResponse(final Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>> fn)
{ {
return asLeaderWith( return asLeaderWith(
@ -742,7 +776,8 @@ public class OverlordResource
workItem.getQueueInsertionTime(), workItem.getQueueInsertionTime(),
null, null,
null, null,
workItem.getLocation() workItem.getLocation(),
workItem.getDataSource()
); );
} }
} }

View File

@ -43,7 +43,6 @@ import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
@ -321,22 +320,41 @@ public class RemoteTaskRunnerTest
@Test @Test
public void testBootstrap() throws Exception public void testBootstrap() throws Exception
{ {
cf.create() Period timeoutPeriod = Period.millis(1000);
.creatingParentsIfNeeded() makeWorker();
.withMode(CreateMode.EPHEMERAL)
.forPath(joiner.join(statusPath, "first"), jsonMapper.writeValueAsBytes(TaskStatus.running("first")));
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(joiner.join(statusPath, "second"), jsonMapper.writeValueAsBytes(TaskStatus.running("second")));
doSetup(); RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(100);
final Set<String> existingTasks = Sets.newHashSet(); makeRemoteTaskRunner(rtrConfig);
for (ImmutableWorkerInfo workerInfo : remoteTaskRunner.getWorkers()) {
existingTasks.addAll(workerInfo.getRunningTasks()); TestRealtimeTask task1 = new TestRealtimeTask(
} "first",
Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks); new TaskResource("first", 1),
"foo",
TaskStatus.running("first"),
jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task = new TestRealtimeTask(
"second",
new TaskResource("task", 2),
"foo",
TaskStatus.running("task"),
jsonMapper);
remoteTaskRunner.run(task);
TestRealtimeTask task2 = new TestRealtimeTask(
"second",
new TaskResource("second", 2),
"foo",
TaskStatus.running("second"),
jsonMapper);
remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId()));
mockWorkerRunningTask(task2);
final Set<String> runningTasks = Sets.newHashSet( final Set<String> runningTasks = Sets.newHashSet(
Iterables.transform( Iterables.transform(
@ -357,18 +375,19 @@ public class RemoteTaskRunnerTest
@Test @Test
public void testRunWithTaskComplete() throws Exception public void testRunWithTaskComplete() throws Exception
{ {
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(TaskStatus.success(task.getId())));
doSetup(); doSetup();
TestRealtimeTask task1 = new TestRealtimeTask(
"testTask",
new TaskResource("testTask", 2),
"foo",
TaskStatus.success("testTask"),
jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
mockWorkerCompleteSuccessfulTask(task1);
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task); Assert.assertEquals(TaskState.SUCCESS, remoteTaskRunner.run(task1).get().getStatusCode());
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
} }
@Test @Test
@ -621,11 +640,11 @@ public class RemoteTaskRunnerTest
@Test @Test
public void testSortByInsertionTime() throws Exception public void testSortByInsertionTime() throws Exception
{ {
RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", "t", null, null) RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", "t", null, null, "ds_test")
.withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:03Z")); .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:03Z"));
RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", "t", null, null) RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", "t", null, null, "ds_test")
.withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:02Z")); .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:02Z"));
RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", "t", null, null) RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", "t", null, null, "ds_test")
.withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:01Z")); .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:01Z"));
ArrayList<RemoteTaskRunnerWorkItem> workItems = Lists.newArrayList(item1, item2, item3); ArrayList<RemoteTaskRunnerWorkItem> workItems = Lists.newArrayList(item1, item2, item3);
RemoteTaskRunner.sortByInsertionTime(workItems); RemoteTaskRunner.sortByInsertionTime(workItems);

View File

@ -339,7 +339,8 @@ public class PendingTaskBasedProvisioningStrategyTest
testTask.getId(), testTask.getId(),
testTask.getType(), testTask.getType(),
null, null,
TaskLocation.unknown() TaskLocation.unknown(),
testTask.getDataSource()
).withQueueInsertionTime(DateTimes.nowUtc()) ).withQueueInsertionTime(DateTimes.nowUtc())
) )
).times(2); ).times(2);

View File

@ -125,7 +125,7 @@ public class SimpleProvisioningStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Collections.singletonList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource())
.withQueueInsertionTime(DateTimes.nowUtc()) .withQueueInsertionTime(DateTimes.nowUtc())
) )
); );
@ -163,7 +163,7 @@ public class SimpleProvisioningStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Collections.singletonList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource())
.withQueueInsertionTime(DateTimes.nowUtc()) .withQueueInsertionTime(DateTimes.nowUtc())
) )
).times(2); ).times(2);
@ -222,7 +222,7 @@ public class SimpleProvisioningStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Collections.singletonList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource())
.withQueueInsertionTime(DateTimes.nowUtc()) .withQueueInsertionTime(DateTimes.nowUtc())
) )
).times(2); ).times(2);
@ -275,7 +275,7 @@ public class SimpleProvisioningStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Collections.singletonList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource())
.withQueueInsertionTime(DateTimes.nowUtc()) .withQueueInsertionTime(DateTimes.nowUtc())
) )
).times(2); ).times(2);
@ -316,7 +316,7 @@ public class SimpleProvisioningStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Collections.singletonList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource())
.withQueueInsertionTime(DateTimes.nowUtc()) .withQueueInsertionTime(DateTimes.nowUtc())
) )
).times(2); ).times(2);
@ -364,7 +364,7 @@ public class SimpleProvisioningStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Collections.singletonList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource())
.withQueueInsertionTime(DateTimes.nowUtc()) .withQueueInsertionTime(DateTimes.nowUtc())
) )
).times(2); ).times(2);
@ -468,7 +468,7 @@ public class SimpleProvisioningStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Collections.singletonList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null) new RemoteTaskRunnerWorkItem(testTask.getId(), testTask.getType(), null, null, testTask.getDataSource())
.withQueueInsertionTime(DateTimes.nowUtc()) .withQueueInsertionTime(DateTimes.nowUtc())
) )
).times(2); ).times(2);

View File

@ -40,6 +40,7 @@ import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.DateTimes;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
import io.druid.java.util.common.Pair;
import io.druid.server.security.Access; import io.druid.server.security.Access;
import io.druid.server.security.Action; import io.druid.server.security.Action;
import io.druid.server.security.AuthConfig; import io.druid.server.security.AuthConfig;
@ -49,7 +50,9 @@ import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.ForbiddenException; import io.druid.server.security.ForbiddenException;
import io.druid.server.security.Resource; import io.druid.server.security.Resource;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -200,7 +203,32 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
EasyMock.expect(taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(null)).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>> expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null),
new MockTaskRunnerWorkItem(tasksIds.get(2), null)));
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andStubReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny")));
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andStubReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow")));
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(2))).andStubReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(2), "allow")));
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(tasksIds.get(0)))
.andStubReturn(DateTimes.EPOCH);
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(tasksIds.get(1)))
.andStubReturn(DateTimes.EPOCH);
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(tasksIds.get(2)))
.andStubReturn(DateTimes.EPOCH);
EasyMock.expect(taskStorageQueryAdapter.getCreatedDateAndDataSource(tasksIds.get(0)))
.andStubReturn(new Pair<>(DateTime.now(ISOChronology.getInstanceUTC()), "ds_test"));
EasyMock.expect(taskStorageQueryAdapter.getCreatedDateAndDataSource(tasksIds.get(1)))
.andStubReturn(new Pair<>(DateTime.now(ISOChronology.getInstanceUTC()), "ds_test"));
EasyMock.expect(taskStorageQueryAdapter.getCreatedDateAndDataSource(tasksIds.get(2)))
.andStubReturn(new Pair<>(DateTime.now(ISOChronology.getInstanceUTC()), "ds_test"));
EasyMock.expect(taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(null)).andStubReturn(
Lists.transform( Lists.transform(
tasksIds, tasksIds,
new Function<String, TaskStatus>() new Function<String, TaskStatus>()
@ -210,36 +238,14 @@ public class OverlordResourceTest
{ {
return TaskStatus.success(input); return TaskStatus.success(input);
} }
} }));
)
).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(0)))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))
).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(1)))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))
).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(2)))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(2), "allow"))
).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(1)))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))
).once();
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.anyString()))
.andReturn(DateTimes.EPOCH)
.once();
EasyMock.expect(taskStorageQueryAdapter.getTask(EasyMock.eq(tasksIds.get(2)))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(2), "allow"))
).once();
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.anyString()))
.andReturn(DateTimes.EPOCH)
.once();
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
Assert.assertTrue(taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(null).size() == 3);
List<TaskStatusPlus> responseObjects = (List) overlordResource.getCompleteTasks(null, req) Assert.assertTrue(taskRunner.getRunningTasks().size() == 3);
.getEntity(); List<TaskStatusPlus> responseObjects = (List) overlordResource
.getCompleteTasks(null, req).getEntity();
Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId()); Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
Assert.assertEquals(tasksIds.get(2), responseObjects.get(1).getId()); Assert.assertEquals(tasksIds.get(2), responseObjects.get(1).getId());
@ -361,6 +367,54 @@ public class OverlordResourceTest
); );
Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2); Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2);
} }
@Test
public void testGetRunningTasksByDataSource()
{
List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>> expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null)));
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))).once();
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
List<TaskRunnerWorkItem> responseObjects = (List) overlordResource.getRunningTasksByDataSource("ds_test", req)
.getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals(taskStorageQueryAdapter.getTask("id_1").get().getId(), responseObjects.get(0).getTaskId());
Assert.assertEquals(taskStorageQueryAdapter.getTask("id_2").get().getId(), responseObjects.get(1).getTaskId());
Assert.assertTrue("DataSource Check", "ds_test".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetRunningTasksByDataSourceNeg()
{
expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>> expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null)));
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))).once();
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
Assert.assertTrue(taskStorageQueryAdapter.getTask("id_1").isPresent());
Assert.assertTrue(taskStorageQueryAdapter.getTask("id_2").isPresent());
List<TaskRunnerWorkItem> responseObjects = (List) overlordResource.getRunningTasksByDataSource("ds_NA", req)
.getEntity();
Assert.assertEquals(0, responseObjects.size());
}
@After @After
public void tearDown() public void tearDown()
@ -413,6 +467,12 @@ public class OverlordResourceTest
{ {
return "test"; return "test";
} }
@Override
public String getDataSource()
{
return "ds_test";
}
} }
} }

View File

@ -394,6 +394,12 @@ public class OverlordTest
{ {
return task.getType(); return task.getType();
} }
@Override
public String getDataSource()
{
return task.getDataSource();
}
}; };
taskRunnerWorkItems.put(taskId, taskRunnerWorkItem); taskRunnerWorkItems.put(taskId, taskRunnerWorkItem);
return future; return future;