Fix bug in TaskStorageQueryAdapter (#16750)

Changes:
- Do not hold a reference to `TaskQueue` in `TaskStorageQueryAdapter`
- Use `TaskStorage` instead of `TaskStorageQueryAdapter` in `IndexerMetadataStorageAdapter`
- Rename `TaskStorageQueryAdapter` to `TaskQueryTool`
- Fix newly added task actions `RetrieveUpgradedFromSegmentIds` and `RetrieveUpgradedToSegmentIds`
by removing `isAudited` method.
This commit is contained in:
Kashif Faraz 2024-07-17 10:47:41 -07:00 committed by GitHub
parent 40ef9fc4ec
commit 89066b72cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 116 additions and 109 deletions

View File

@ -73,12 +73,6 @@ public class RetrieveUpgradedFromSegmentIdsAction implements TaskAction<Upgraded
); );
} }
@Override
public boolean isAudited()
{
return false;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -78,12 +78,6 @@ public class RetrieveUpgradedToSegmentIdsAction implements TaskAction<UpgradedTo
); );
} }
@Override
public boolean isAudited()
{
return false;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.TaskLookup;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Comparator; import java.util.Comparator;
@ -32,16 +33,16 @@ import java.util.Optional;
public class IndexerMetadataStorageAdapter public class IndexerMetadataStorageAdapter
{ {
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskStorage taskStorage;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@Inject @Inject
public IndexerMetadataStorageAdapter( public IndexerMetadataStorageAdapter(
TaskStorageQueryAdapter taskStorageQueryAdapter, TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator
) )
{ {
this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskStorage = taskStorage;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
} }
@ -49,8 +50,8 @@ public class IndexerMetadataStorageAdapter
{ {
// Find the earliest active task created for the specified datasource; if one exists, // Find the earliest active task created for the specified datasource; if one exists,
// check if its interval overlaps with the delete interval. // check if its interval overlaps with the delete interval.
final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorageQueryAdapter final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorage
.getActiveTaskInfo(dataSource) .getTaskInfos(TaskLookup.activeTasksOnly(), dataSource)
.stream() .stream()
.min(Comparator.comparing(TaskInfo::getCreatedTime)); .min(Comparator.comparing(TaskInfo::getCreatedTime));

View File

@ -27,7 +27,6 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -36,25 +35,23 @@ import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* Wraps a {@link TaskStorage}, providing a useful collection of read-only methods. * Provides read-only methods to fetch information related to tasks.
* This class may serve information that is cached in memory in {@link TaskQueue}
* or {@link TaskLockbox}. If not present in memory, then the underlying
* {@link TaskStorage} is queried.
*/ */
public class TaskStorageQueryAdapter public class TaskQueryTool
{ {
private final TaskStorage storage; private final TaskStorage storage;
private final TaskLockbox taskLockbox; private final TaskLockbox taskLockbox;
private final Optional<TaskQueue> taskQueue; private final TaskMaster taskMaster;
@Inject @Inject
public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) public TaskQueryTool(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster)
{ {
this.storage = storage; this.storage = storage;
this.taskLockbox = taskLockbox; this.taskLockbox = taskLockbox;
this.taskQueue = taskMaster.getTaskQueue(); this.taskMaster = taskMaster;
}
public List<Task> getActiveTasks()
{
return storage.getActiveTasks();
} }
/** /**
@ -85,7 +82,7 @@ public class TaskStorageQueryAdapter
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource) public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{ {
return storage.getTaskInfos( return storage.getTaskInfos(
ActiveTaskLookup.getInstance(), TaskLookup.activeTasksOnly(),
dataSource dataSource
); );
} }
@ -98,20 +95,21 @@ public class TaskStorageQueryAdapter
return storage.getTaskStatusPlusList(taskLookups, dataSource); return storage.getTaskStatusPlusList(taskLookups, dataSource);
} }
public Optional<Task> getTask(final String taskid) public Optional<Task> getTask(final String taskId)
{ {
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) { if (taskQueue.isPresent()) {
Optional<Task> activeTask = taskQueue.get().getActiveTask(taskid); Optional<Task> activeTask = taskQueue.get().getActiveTask(taskId);
if (activeTask.isPresent()) { if (activeTask.isPresent()) {
return activeTask; return activeTask;
} }
} }
return storage.getTask(taskid); return storage.getTask(taskId);
} }
public Optional<TaskStatus> getStatus(final String taskid) public Optional<TaskStatus> getStatus(final String taskId)
{ {
return storage.getStatus(taskid); return storage.getStatus(taskId);
} }
@Nullable @Nullable

View File

@ -47,10 +47,10 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
@ -125,7 +125,7 @@ public class OverlordResource
private static final Logger log = new Logger(OverlordResource.class); private static final Logger log = new Logger(OverlordResource.class);
private final TaskMaster taskMaster; private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskQueryTool taskQueryTool;
private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private final TaskLogStreamer taskLogStreamer; private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager; private final JacksonConfigManager configManager;
@ -162,7 +162,7 @@ public class OverlordResource
@Inject @Inject
public OverlordResource( public OverlordResource(
TaskMaster taskMaster, TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter, TaskQueryTool taskQueryTool,
IndexerMetadataStorageAdapter indexerMetadataStorageAdapter, IndexerMetadataStorageAdapter indexerMetadataStorageAdapter,
TaskLogStreamer taskLogStreamer, TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager, JacksonConfigManager configManager,
@ -174,7 +174,7 @@ public class OverlordResource
) )
{ {
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskQueryTool = taskQueryTool;
this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter; this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter;
this.taskLogStreamer = taskLogStreamer; this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager; this.configManager = configManager;
@ -284,7 +284,7 @@ public class OverlordResource
} }
// Build the response // Build the response
return Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build(); return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build();
} }
@POST @POST
@ -298,7 +298,7 @@ public class OverlordResource
} }
// Build the response // Build the response
return Response.ok(taskStorageQueryAdapter.getLockedIntervals(lockFilterPolicies)).build(); return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
} }
@GET @GET
@ -309,7 +309,7 @@ public class OverlordResource
{ {
final TaskPayloadResponse response = new TaskPayloadResponse( final TaskPayloadResponse response = new TaskPayloadResponse(
taskid, taskid,
taskStorageQueryAdapter.getTask(taskid).orNull() taskQueryTool.getTask(taskid).orNull()
); );
final Response.Status status = response.getPayload() == null final Response.Status status = response.getPayload() == null
@ -325,7 +325,7 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class) @ResourceFilters(TaskResourceFilter.class)
public Response getTaskStatus(@PathParam("taskid") String taskid) public Response getTaskStatus(@PathParam("taskid") String taskid)
{ {
final TaskInfo<Task, TaskStatus> taskInfo = taskStorageQueryAdapter.getTaskInfo(taskid); final TaskInfo<Task, TaskStatus> taskInfo = taskQueryTool.getTaskInfo(taskid);
TaskStatusResponse response = null; TaskStatusResponse response = null;
if (taskInfo != null) { if (taskInfo != null) {
@ -440,7 +440,7 @@ public class OverlordResource
@Override @Override
public Response apply(TaskQueue taskQueue) public Response apply(TaskQueue taskQueue)
{ {
final List<TaskInfo<Task, TaskStatus>> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource); final List<TaskInfo<Task, TaskStatus>> tasks = taskQueryTool.getActiveTaskInfo(dataSource);
if (tasks.isEmpty()) { if (tasks.isEmpty()) {
return Response.status(Status.NOT_FOUND).build(); return Response.status(Status.NOT_FOUND).build();
} else { } else {
@ -471,7 +471,7 @@ public class OverlordResource
if (taskQueue.isPresent()) { if (taskQueue.isPresent()) {
optional = taskQueue.get().getTaskStatus(taskId); optional = taskQueue.get().getTaskStatus(taskId);
} else { } else {
optional = taskStorageQueryAdapter.getStatus(taskId); optional = taskQueryTool.getStatus(taskId);
} }
if (optional.isPresent()) { if (optional.isPresent()) {
result.put(taskId, optional.get()); result.put(taskId, optional.get());
@ -866,7 +866,7 @@ public class OverlordResource
throw new IAE("Unknown state: [%s]", state); throw new IAE("Unknown state: [%s]", state);
} }
final Stream<TaskStatusPlus> taskStatusPlusStream = taskStorageQueryAdapter.getTaskStatusPlusList( final Stream<TaskStatusPlus> taskStatusPlusStream = taskQueryTool.getTaskStatusPlusList(
taskLookups, taskLookups,
dataSource dataSource
).stream(); ).stream();

View File

@ -26,7 +26,7 @@ import com.google.inject.Inject;
import com.sun.jersey.spi.container.ContainerRequest; import com.sun.jersey.spi.container.ContainerRequest;
import org.apache.druid.common.utils.IdUtils; import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.http.security.AbstractResourceFilter; import org.apache.druid.server.http.security.AbstractResourceFilter;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
@ -49,16 +49,16 @@ import javax.ws.rs.core.Response;
*/ */
public class TaskResourceFilter extends AbstractResourceFilter public class TaskResourceFilter extends AbstractResourceFilter
{ {
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskQueryTool taskQueryTool;
@Inject @Inject
public TaskResourceFilter( public TaskResourceFilter(
TaskStorageQueryAdapter taskStorageQueryAdapter, TaskQueryTool taskQueryTool,
AuthorizerMapper authorizerMapper AuthorizerMapper authorizerMapper
) )
{ {
super(authorizerMapper); super(authorizerMapper);
this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskQueryTool = taskQueryTool;
} }
@Override @Override
@ -76,7 +76,7 @@ public class TaskResourceFilter extends AbstractResourceFilter
IdUtils.validateId("taskId", taskId); IdUtils.validateId("taskId", taskId);
Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId); Optional<Task> taskOptional = taskQueryTool.getTask(taskId);
if (!taskOptional.isPresent()) { if (!taskOptional.isPresent()) {
throw new WebApplicationException( throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND) Response.status(Response.Status.NOT_FOUND)

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.TaskLookup;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -39,7 +40,7 @@ import java.util.List;
public class IndexerMetadataStorageAdapterTest public class IndexerMetadataStorageAdapterTest
{ {
private TaskStorageQueryAdapter taskStorageQueryAdapter; private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@ -47,9 +48,9 @@ public class IndexerMetadataStorageAdapterTest
public void setup() public void setup()
{ {
indexerMetadataStorageCoordinator = EasyMock.strictMock(IndexerMetadataStorageCoordinator.class); indexerMetadataStorageCoordinator = EasyMock.strictMock(IndexerMetadataStorageCoordinator.class);
taskStorageQueryAdapter = EasyMock.strictMock(TaskStorageQueryAdapter.class); taskStorage = EasyMock.strictMock(TaskStorage.class);
indexerMetadataStorageAdapter = new IndexerMetadataStorageAdapter( indexerMetadataStorageAdapter = new IndexerMetadataStorageAdapter(
taskStorageQueryAdapter, taskStorage,
indexerMetadataStorageCoordinator indexerMetadataStorageCoordinator
); );
} }
@ -73,7 +74,7 @@ public class IndexerMetadataStorageAdapterTest
NoopTask.create() NoopTask.create()
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos); EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource")).andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01"); final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock EasyMock
@ -84,7 +85,7 @@ public class IndexerMetadataStorageAdapterTest
) )
) )
.andReturn(10); .andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);
Assert.assertEquals(10, indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval)); Assert.assertEquals(10, indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval));
} }
@ -109,7 +110,8 @@ public class IndexerMetadataStorageAdapterTest
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos); EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource"))
.andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01"); final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock EasyMock
@ -120,7 +122,7 @@ public class IndexerMetadataStorageAdapterTest
) )
) )
.andReturn(10); .andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);
MatcherAssert.assertThat( MatcherAssert.assertThat(
Assert.assertThrows( Assert.assertThrows(
@ -155,7 +157,8 @@ public class IndexerMetadataStorageAdapterTest
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos); EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource"))
.andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01"); final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01");
EasyMock EasyMock
@ -166,7 +169,7 @@ public class IndexerMetadataStorageAdapterTest
) )
) )
.andReturn(10); .andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);
MatcherAssert.assertThat( MatcherAssert.assertThat(
Assert.assertThrows( Assert.assertThrows(

View File

@ -234,7 +234,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
private final String taskStorageType; private final String taskStorageType;
private ObjectMapper mapper; private ObjectMapper mapper;
private TaskStorageQueryAdapter tsqa = null; private TaskQueryTool tsqa = null;
private TaskStorage taskStorage = null; private TaskStorage taskStorage = null;
private TaskLockbox taskLockbox = null; private TaskLockbox taskLockbox = null;
private TaskQueue taskQueue = null; private TaskQueue taskQueue = null;
@ -477,7 +477,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(taskMaster);
tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster);
return taskStorage; return taskStorage;
} }

View File

@ -42,10 +42,10 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
@ -106,7 +106,7 @@ public class OverlordResourceTest
private JacksonConfigManager configManager; private JacksonConfigManager configManager;
private ProvisioningStrategy provisioningStrategy; private ProvisioningStrategy provisioningStrategy;
private AuthConfig authConfig; private AuthConfig authConfig;
private TaskStorageQueryAdapter taskStorageQueryAdapter; private TaskQueryTool taskQueryTool;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private HttpServletRequest req; private HttpServletRequest req;
private TaskRunner taskRunner; private TaskRunner taskRunner;
@ -126,7 +126,7 @@ public class OverlordResourceTest
provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
authConfig = EasyMock.createMock(AuthConfig.class); authConfig = EasyMock.createMock(AuthConfig.class);
taskMaster = EasyMock.createStrictMock(TaskMaster.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class);
taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class); taskQueryTool = EasyMock.createStrictMock(TaskQueryTool.class);
indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
req = EasyMock.createStrictMock(HttpServletRequest.class); req = EasyMock.createStrictMock(HttpServletRequest.class);
workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class); workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class);
@ -171,7 +171,7 @@ public class OverlordResourceTest
overlordResource = new OverlordResource( overlordResource = new OverlordResource(
taskMaster, taskMaster,
taskStorageQueryAdapter, taskQueryTool,
indexerMetadataStorageAdapter, indexerMetadataStorageAdapter,
null, null,
configManager, configManager,
@ -189,7 +189,7 @@ public class OverlordResourceTest
EasyMock.verify( EasyMock.verify(
taskRunner, taskRunner,
taskMaster, taskMaster,
taskStorageQueryAdapter, taskQueryTool,
indexerMetadataStorageAdapter, indexerMetadataStorageAdapter,
req, req,
workerTaskRunnerQueryAdapter, workerTaskRunnerQueryAdapter,
@ -202,7 +202,7 @@ public class OverlordResourceTest
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
taskMaster, taskMaster,
taskStorageQueryAdapter, taskQueryTool,
indexerMetadataStorageAdapter, indexerMetadataStorageAdapter,
req, req,
workerTaskRunnerQueryAdapter, workerTaskRunnerQueryAdapter,
@ -247,7 +247,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null null
) )
@ -282,7 +282,7 @@ public class OverlordResourceTest
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null)
).andStubReturn( ).andStubReturn(
ImmutableList.of( ImmutableList.of(
@ -292,6 +292,7 @@ public class OverlordResourceTest
) )
); );
replayAll(); replayAll();
List<TaskStatusPlus> responseObjects = (List) overlordResource List<TaskStatusPlus> responseObjects = (List) overlordResource
.getCompleteTasks(null, req).getEntity(); .getCompleteTasks(null, req).getEntity();
@ -312,7 +313,7 @@ public class OverlordResourceTest
) )
); );
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null null
) )
@ -339,7 +340,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.ACTIVE, TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance(), ActiveTaskLookup.getInstance(),
@ -367,6 +368,7 @@ public class OverlordResourceTest
).atLeastOnce(); ).atLeastOnce();
replayAll(); replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, null, null, null, null, req) .getTasks(null, null, null, null, null, req)
.getEntity(); .getEntity();
@ -379,7 +381,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//completed tasks //completed tasks
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null), CompleteTaskLookup.of(null, null),
@ -421,7 +423,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//active tasks //active tasks
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.ACTIVE, TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance() ActiveTaskLookup.getInstance()
@ -445,6 +447,7 @@ public class OverlordResourceTest
); );
replayAll(); replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks( .getTasks(
"waiting", "waiting",
@ -463,7 +466,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.ACTIVE, TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance() ActiveTaskLookup.getInstance()
@ -513,7 +516,7 @@ public class OverlordResourceTest
) )
); );
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null null
) )
@ -547,7 +550,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)),
null null
) )
@ -559,6 +562,7 @@ public class OverlordResourceTest
) )
); );
replayAll(); replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req) .getTasks("complete", null, null, null, null, req)
.getEntity(); .getEntity();
@ -573,7 +577,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
Duration duration = new Period("PT86400S").toStandardDuration(); Duration duration = new Period("PT86400S").toStandardDuration();
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject() EasyMock.anyObject()
) )
@ -586,6 +590,7 @@ public class OverlordResourceTest
); );
replayAll(); replayAll();
String interval = "2010-01-01_P1D"; String interval = "2010-01-01_P1D";
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, interval, null, null, req) .getTasks("complete", null, interval, null, null, req)
@ -604,7 +609,7 @@ public class OverlordResourceTest
// Setup mocks to return completed, active, known, pending and running tasks // Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null), CompleteTaskLookup.of(null, null),
@ -653,7 +658,7 @@ public class OverlordResourceTest
// Setup mocks to return completed, active, known, pending and running tasks // Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null), CompleteTaskLookup.of(null, null),
@ -711,7 +716,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskStorageQueryAdapter.getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null) CompleteTaskLookup.of(null, null)
@ -726,6 +731,7 @@ public class OverlordResourceTest
) )
); );
replayAll(); replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req) .getTasks("complete", null, null, null, null, req)
.getEntity(); .getEntity();
@ -738,6 +744,7 @@ public class OverlordResourceTest
public void testGetTasksNegativeState() public void testGetTasksNegativeState()
{ {
replayAll(); replayAll();
Object responseObject = overlordResource Object responseObject = overlordResource
.getTasks("blah", "ds_test", null, null, null, req) .getTasks("blah", "ds_test", null, null, null, req)
.getEntity(); .getEntity();
@ -755,6 +762,7 @@ public class OverlordResourceTest
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
replayAll(); replayAll();
Task task = NoopTask.create(); Task task = NoopTask.create();
overlordResource.taskPost(task, req); overlordResource.taskPost(task, req);
} }
@ -936,10 +944,10 @@ public class OverlordResourceTest
// This should be fixed in https://github.com/apache/druid/issues/6685. // This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck(); // expectAuthorizationTokenCheck();
final NoopTask task = NoopTask.create(); final NoopTask task = NoopTask.create();
EasyMock.expect(taskStorageQueryAdapter.getTask("mytask")) EasyMock.expect(taskQueryTool.getTask("mytask"))
.andReturn(Optional.of(task)); .andReturn(Optional.of(task));
EasyMock.expect(taskStorageQueryAdapter.getTask("othertask")) EasyMock.expect(taskQueryTool.getTask("othertask"))
.andReturn(Optional.absent()); .andReturn(Optional.absent());
replayAll(); replayAll();
@ -970,7 +978,7 @@ public class OverlordResourceTest
final String taskId = task.getId(); final String taskId = task.getId();
final TaskStatus status = TaskStatus.running(taskId); final TaskStatus status = TaskStatus.running(taskId);
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo(taskId)) EasyMock.expect(taskQueryTool.getTaskInfo(taskId))
.andReturn(new TaskInfo( .andReturn(new TaskInfo(
task.getId(), task.getId(),
DateTimes.of("2018-01-01"), DateTimes.of("2018-01-01"),
@ -979,7 +987,7 @@ public class OverlordResourceTest
task task
)); ));
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("othertask")) EasyMock.expect(taskQueryTool.getTaskInfo("othertask"))
.andReturn(null); .andReturn(null);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()) EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks())
@ -1034,7 +1042,7 @@ public class OverlordResourceTest
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)) EasyMock.expect(taskQueryTool.getLockedIntervals(minTaskPriority))
.andReturn(expectedLockedIntervals); .andReturn(expectedLockedIntervals);
replayAll(); replayAll();
@ -1104,7 +1112,7 @@ public class OverlordResourceTest
EasyMock.expect(taskMaster.getTaskQueue()).andReturn( EasyMock.expect(taskMaster.getTaskQueue()).andReturn(
Optional.of(mockQueue) Optional.of(mockQueue)
).anyTimes(); ).anyTimes();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( EasyMock.expect(taskQueryTool.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of(
new TaskInfo<>( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
@ -1140,7 +1148,7 @@ public class OverlordResourceTest
final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList()); EasyMock.expect(taskQueryTool.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList());
replayAll(); replayAll();
final Response response = overlordResource.shutdownTasksForDataSource("notExisting"); final Response response = overlordResource.shutdownTasksForDataSource("notExisting");
@ -1232,6 +1240,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1247,6 +1256,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1263,6 +1273,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1471,15 +1482,16 @@ public class OverlordResourceTest
{ {
replayAll(); replayAll();
TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); TaskQueryTool taskQueryTool = EasyMock.createMock(TaskQueryTool.class);
EasyMock.expect(taskStorageQueryAdapter.getStatus("task")) EasyMock.expect(taskQueryTool.getStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task"))); .andReturn(Optional.of(TaskStatus.running("task")));
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent());
EasyMock.replay(taskMaster, taskStorageQueryAdapter); EasyMock.replay(taskMaster, taskQueryTool);
OverlordResource overlordResource = new OverlordResource( OverlordResource overlordResource = new OverlordResource(
taskMaster, taskMaster,
taskStorageQueryAdapter, taskQueryTool,
null, null,
null, null,
null, null,

View File

@ -52,12 +52,12 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
@ -269,14 +269,14 @@ public class OverlordTest
Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort());
Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation());
final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); final TaskQueryTool taskQueryTool = new TaskQueryTool(taskStorage, taskLockbox, taskMaster);
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null);
// Test Overlord resource stuff // Test Overlord resource stuff
AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class); AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class);
overlordResource = new OverlordResource( overlordResource = new OverlordResource(
taskMaster, taskMaster,
taskStorageQueryAdapter, taskQueryTool,
new IndexerMetadataStorageAdapter(taskStorageQueryAdapter, null), new IndexerMetadataStorageAdapter(taskStorage, null),
null, null,
null, null,
auditManager, auditManager,

View File

@ -26,7 +26,7 @@ import com.google.inject.Injector;
import com.sun.jersey.spi.container.ResourceFilter; import com.sun.jersey.spi.container.ResourceFilter;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.http.OverlordResource; import org.apache.druid.indexing.overlord.http.OverlordResource;
import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
@ -60,7 +60,7 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
return ImmutableList.copyOf( return ImmutableList.copyOf(
Iterables.concat( Iterables.concat(
getRequestPaths(OverlordResource.class, ImmutableList.of( getRequestPaths(OverlordResource.class, ImmutableList.of(
TaskStorageQueryAdapter.class, TaskQueryTool.class,
AuthorizerMapper.class AuthorizerMapper.class
) )
), ),
@ -84,7 +84,7 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
private static boolean mockedOnceTsqa; private static boolean mockedOnceTsqa;
private static boolean mockedOnceSM; private static boolean mockedOnceSM;
private TaskStorageQueryAdapter tsqa; private TaskQueryTool tsqa;
private SupervisorManager supervisorManager; private SupervisorManager supervisorManager;
public OverlordSecurityResourceFilterTest( public OverlordSecurityResourceFilterTest(
@ -107,7 +107,7 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
// Since we are creating the mocked tsqa object only once and getting that object from Guice here therefore // Since we are creating the mocked tsqa object only once and getting that object from Guice here therefore
// if the mockedOnce check is not done then we will call EasyMock.expect and EasyMock.replay on the mocked object // if the mockedOnce check is not done then we will call EasyMock.expect and EasyMock.replay on the mocked object
// multiple times and it will throw exceptions // multiple times and it will throw exceptions
tsqa = injector.getInstance(TaskStorageQueryAdapter.class); tsqa = injector.getInstance(TaskQueryTool.class);
EasyMock.expect(tsqa.getTask(EasyMock.anyString())).andReturn(Optional.of(noopTask)).anyTimes(); EasyMock.expect(tsqa.getTask(EasyMock.anyString())).andReturn(Optional.of(noopTask)).anyTimes();
EasyMock.replay(tsqa); EasyMock.replay(tsqa);
mockedOnceTsqa = true; mockedOnceTsqa = true;

View File

@ -21,7 +21,7 @@ package org.apache.druid.indexing.overlord.http.security;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.sun.jersey.spi.container.ContainerRequest; import com.sun.jersey.spi.container.ContainerRequest;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock; import org.easymock.EasyMock;
@ -42,7 +42,7 @@ import static org.easymock.EasyMock.expect;
public class TaskResourceFilterTest public class TaskResourceFilterTest
{ {
private AuthorizerMapper authorizerMapper; private AuthorizerMapper authorizerMapper;
private TaskStorageQueryAdapter taskStorageQueryAdapter; private TaskQueryTool taskQueryTool;
private ContainerRequest containerRequest; private ContainerRequest containerRequest;
private TaskResourceFilter resourceFilter; private TaskResourceFilter resourceFilter;
@ -50,9 +50,9 @@ public class TaskResourceFilterTest
public void setup() public void setup()
{ {
authorizerMapper = EasyMock.createMock(AuthorizerMapper.class); authorizerMapper = EasyMock.createMock(AuthorizerMapper.class);
taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); taskQueryTool = EasyMock.createMock(TaskQueryTool.class);
containerRequest = EasyMock.createMock(ContainerRequest.class); containerRequest = EasyMock.createMock(ContainerRequest.class);
resourceFilter = new TaskResourceFilter(taskStorageQueryAdapter, authorizerMapper); resourceFilter = new TaskResourceFilter(taskQueryTool, authorizerMapper);
} }
@Test @Test
@ -68,11 +68,11 @@ public class TaskResourceFilterTest
expect(supervisorSpec.getDataSources()) expect(supervisorSpec.getDataSources())
.andReturn(Collections.singletonList(taskId)) .andReturn(Collections.singletonList(taskId))
.anyTimes(); .anyTimes();
expect(taskStorageQueryAdapter.getTask(taskId)) expect(taskQueryTool.getTask(taskId))
.andReturn(Optional.absent()) .andReturn(Optional.absent())
.atLeastOnce(); .atLeastOnce();
EasyMock.replay(containerRequest); EasyMock.replay(containerRequest);
EasyMock.replay(taskStorageQueryAdapter); EasyMock.replay(taskQueryTool);
WebApplicationException expected = null; WebApplicationException expected = null;
try { try {
@ -84,7 +84,7 @@ public class TaskResourceFilterTest
Assert.assertNotNull(expected); Assert.assertNotNull(expected);
Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
EasyMock.verify(containerRequest); EasyMock.verify(containerRequest);
EasyMock.verify(taskStorageQueryAdapter); EasyMock.verify(taskQueryTool);
} }
private List<PathSegment> getPathSegments(String path) private List<PathSegment> getPathSegments(String path)

View File

@ -43,6 +43,11 @@ public interface TaskLookup
COMPLETE COMPLETE
} }
static TaskLookup activeTasksOnly()
{
return ActiveTaskLookup.getInstance();
}
/** /**
* Whether this lookup is guaranteed to not return any tasks. * Whether this lookup is guaranteed to not return any tasks.
*/ */

View File

@ -130,7 +130,7 @@ public class TaskLookupTest
@Test @Test
public void testGetType() public void testGetType()
{ {
Assert.assertEquals(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance().getType()); Assert.assertEquals(TaskLookupType.ACTIVE, TaskLookup.activeTasksOnly().getType());
} }
} }
} }

View File

@ -74,9 +74,9 @@ import org.apache.druid.indexing.overlord.MetadataTaskStorage;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningConfig; import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningConfig;
import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
@ -229,7 +229,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class); binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class); binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class); binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); binder.bind(TaskQueryTool.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageAdapter.class).in(LazySingleton.class); binder.bind(IndexerMetadataStorageAdapter.class).in(LazySingleton.class);
binder.bind(SupervisorManager.class).in(LazySingleton.class); binder.bind(SupervisorManager.class).in(LazySingleton.class);