mirror of https://github.com/apache/druid.git
Added support for filtering by unused parameter for HeapMemoryTaskStorage (#6510)
* 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method: HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time 2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method. 3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter. * 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method: HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time 2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method. 3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter. * Fixed OverlordResourceTest to Support changed methods names * Changed methods and parameters names to make them more obvious to understand. * Changed String.replace() for the StringUtils.replace()(#6607) * Fixed checkstyle error
This commit is contained in:
parent
87b96fb1fd
commit
81c9a6177c
|
@ -221,9 +221,9 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
|
||||
public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
|
||||
@Nullable Integer maxTaskStatuses,
|
||||
@Nullable Duration duration,
|
||||
@Nullable Duration durationBeforeNow,
|
||||
@Nullable String datasource
|
||||
)
|
||||
{
|
||||
|
@ -240,18 +240,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
}.reverse();
|
||||
|
||||
return maxTaskStatuses == null ?
|
||||
getRecentlyFinishedTaskInfoSince(
|
||||
DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration),
|
||||
getRecentlyCreatedAlreadyFinishedTaskInfoSince(
|
||||
DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
|
||||
createdDateDesc
|
||||
) :
|
||||
getNRecentlyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
|
||||
getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfoSince(
|
||||
private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
|
||||
DateTime start,
|
||||
Ordering<TaskStuff> createdDateDesc
|
||||
)
|
||||
|
@ -262,7 +262,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
List<TaskStuff> list = createdDateDesc
|
||||
.sortedCopy(tasks.values())
|
||||
.stream()
|
||||
.filter(taskStuff -> taskStuff.getStatus().isComplete())
|
||||
.filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start))
|
||||
.collect(Collectors.toList());
|
||||
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
|
||||
for (final TaskStuff taskStuff : list) {
|
||||
|
@ -283,7 +283,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
}
|
||||
}
|
||||
|
||||
private List<TaskInfo<Task, TaskStatus>> getNRecentlyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
|
||||
private List<TaskInfo<Task, TaskStatus>> getNRecentlyCreatedAlreadyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
|
@ -291,6 +291,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
List<TaskStuff> list = createdDateDesc
|
||||
.sortedCopy(tasks.values())
|
||||
.stream()
|
||||
.filter(taskStuff -> taskStuff.getStatus().isComplete())
|
||||
.limit(n)
|
||||
.collect(Collectors.toList());
|
||||
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
|
||||
|
|
|
@ -208,15 +208,15 @@ public class MetadataTaskStorage implements TaskStorage
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
|
||||
public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
|
||||
@Nullable Integer maxTaskStatuses,
|
||||
@Nullable Duration duration,
|
||||
@Nullable Duration durationBeforeNow,
|
||||
@Nullable String datasource
|
||||
)
|
||||
{
|
||||
return ImmutableList.copyOf(
|
||||
handler.getCompletedTaskInfo(
|
||||
DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration),
|
||||
DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
|
||||
maxTaskStatuses,
|
||||
datasource
|
||||
)
|
||||
|
|
|
@ -141,14 +141,14 @@ public interface TaskStorage
|
|||
* return nothing.
|
||||
*
|
||||
* @param maxTaskStatuses maxTaskStatuses
|
||||
* @param duration duration
|
||||
* @param durationBeforeNow duration
|
||||
* @param datasource datasource
|
||||
*
|
||||
* @return list of {@link TaskInfo}
|
||||
*/
|
||||
List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
|
||||
List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
|
||||
@Nullable Integer maxTaskStatuses,
|
||||
@Nullable Duration duration,
|
||||
@Nullable Duration durationBeforeNow,
|
||||
@Nullable String datasource
|
||||
);
|
||||
|
||||
|
|
|
@ -58,13 +58,13 @@ public class TaskStorageQueryAdapter
|
|||
return storage.getActiveTaskInfo(dataSource);
|
||||
}
|
||||
|
||||
public List<TaskInfo<Task, TaskStatus>> getRecentlyCompletedTaskInfo(
|
||||
public List<TaskInfo<Task, TaskStatus>> getCompletedTaskInfoByCreatedTimeDuration(
|
||||
@Nullable Integer maxTaskStatuses,
|
||||
@Nullable Duration duration,
|
||||
@Nullable String dataSource
|
||||
)
|
||||
{
|
||||
return storage.getRecentlyFinishedTaskInfo(maxTaskStatuses, duration, dataSource);
|
||||
return storage.getRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, duration, dataSource);
|
||||
}
|
||||
|
||||
public Optional<Task> getTask(final String taskid)
|
||||
|
|
|
@ -627,7 +627,7 @@ public class OverlordResource
|
|||
public Response getTasks(
|
||||
@QueryParam("state") final String state,
|
||||
@QueryParam("datasource") final String dataSource,
|
||||
@QueryParam("interval") final String interval,
|
||||
@PathParam("createdTimeInterval") final String createdTimeInterval,
|
||||
@QueryParam("max") final Integer maxCompletedTasks,
|
||||
@QueryParam("type") final String type,
|
||||
@Context final HttpServletRequest req
|
||||
|
@ -692,13 +692,13 @@ public class OverlordResource
|
|||
|
||||
//checking for complete tasks first to avoid querying active tasks if user only wants complete tasks
|
||||
if (state == null || "complete".equals(StringUtils.toLowerCase(state))) {
|
||||
Duration duration = null;
|
||||
if (interval != null) {
|
||||
final Interval theInterval = Intervals.of(interval.replace('_', '/'));
|
||||
duration = theInterval.toDuration();
|
||||
Duration createdTimeDuration = null;
|
||||
if (createdTimeInterval != null) {
|
||||
final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/"));
|
||||
createdTimeDuration = theInterval.toDuration();
|
||||
}
|
||||
final List<TaskInfo<Task, TaskStatus>> taskInfoList =
|
||||
taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(maxCompletedTasks, duration, dataSource);
|
||||
taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(maxCompletedTasks, createdTimeDuration, dataSource);
|
||||
final List<TaskStatusPlus> completedTasks = taskInfoList.stream()
|
||||
.map(completeTaskTransformFunc::apply)
|
||||
.collect(Collectors.toList());
|
||||
|
|
|
@ -233,7 +233,7 @@ public class OverlordResourceTest
|
|||
new MockTaskRunnerWorkItem(tasksIds.get(1), null),
|
||||
new MockTaskRunnerWorkItem(tasksIds.get(2), null)));
|
||||
|
||||
EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
|
||||
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
|
||||
ImmutableList.of(
|
||||
new TaskInfo(
|
||||
"id_1",
|
||||
|
@ -259,7 +259,7 @@ public class OverlordResourceTest
|
|||
)
|
||||
);
|
||||
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
|
||||
Assert.assertTrue(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null).size() == 3);
|
||||
Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3);
|
||||
Assert.assertTrue(taskRunner.getRunningTasks().size() == 3);
|
||||
List<TaskStatusPlus> responseObjects = (List) overlordResource
|
||||
.getCompleteTasks(null, req).getEntity();
|
||||
|
@ -313,7 +313,7 @@ public class OverlordResourceTest
|
|||
{
|
||||
expectAuthorizationTokenCheck();
|
||||
//completed tasks
|
||||
EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
|
||||
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
|
||||
ImmutableList.of(
|
||||
new TaskInfo(
|
||||
"id_5",
|
||||
|
@ -403,7 +403,7 @@ public class OverlordResourceTest
|
|||
{
|
||||
expectAuthorizationTokenCheck();
|
||||
//completed tasks
|
||||
EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, "allow")).andStubReturn(
|
||||
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")).andStubReturn(
|
||||
ImmutableList.of(
|
||||
new TaskInfo(
|
||||
"id_5",
|
||||
|
@ -667,7 +667,7 @@ public class OverlordResourceTest
|
|||
public void testGetTasksFilterCompleteState()
|
||||
{
|
||||
expectAuthorizationTokenCheck();
|
||||
EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
|
||||
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
|
||||
ImmutableList.of(
|
||||
new TaskInfo(
|
||||
"id_1",
|
||||
|
@ -707,7 +707,7 @@ public class OverlordResourceTest
|
|||
expectAuthorizationTokenCheck();
|
||||
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
|
||||
Duration duration = new Period("PT86400S").toStandardDuration();
|
||||
EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, duration, null)).andStubReturn(
|
||||
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)).andStubReturn(
|
||||
ImmutableList.of(
|
||||
new TaskInfo(
|
||||
"id_1",
|
||||
|
@ -747,7 +747,7 @@ public class OverlordResourceTest
|
|||
public void testGetNullCompleteTask()
|
||||
{
|
||||
expectAuthorizationTokenCheck();
|
||||
EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
|
||||
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
|
||||
ImmutableList.of(
|
||||
new TaskInfo(
|
||||
"id_1",
|
||||
|
|
Loading…
Reference in New Issue