From 81c9a6177cc6958942eaa9ea294fe96c4a3708f4 Mon Sep 17 00:00:00 2001 From: Marat Date: Wed, 21 Nov 2018 00:42:44 +0300 Subject: [PATCH] Added support for filtering by unused parameter for HeapMemoryTaskStorage (#6510) * 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method: HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time 2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method. 3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter. * 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method: HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time 2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method. 3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter. * Fixed OverlordResourceTest to Support changed methods names * Changed methods and parameters names to make them more obvious to understand. * Changed String.replace() for the StringUtils.replace()(#6607) * Fixed checkstyle error --- .../overlord/HeapMemoryTaskStorage.java | 17 +++++++++-------- .../indexing/overlord/MetadataTaskStorage.java | 6 +++--- .../druid/indexing/overlord/TaskStorage.java | 6 +++--- .../overlord/TaskStorageQueryAdapter.java | 4 ++-- .../overlord/http/OverlordResource.java | 12 ++++++------ .../overlord/http/OverlordResourceTest.java | 14 +++++++------- 6 files changed, 30 insertions(+), 29 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 006286ac841..51b38d02609 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -221,9 +221,9 @@ public class HeapMemoryTaskStorage implements TaskStorage } @Override - public List> getRecentlyFinishedTaskInfo( + public List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ) { @@ -240,18 +240,18 @@ public class HeapMemoryTaskStorage implements TaskStorage }.reverse(); return maxTaskStatuses == null ? - getRecentlyFinishedTaskInfoSince( - DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration), + getRecentlyCreatedAlreadyFinishedTaskInfoSince( + DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), createdDateDesc ) : - getNRecentlyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); + getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); } finally { giant.unlock(); } } - private List> getRecentlyFinishedTaskInfoSince( + private List> getRecentlyCreatedAlreadyFinishedTaskInfoSince( DateTime start, Ordering createdDateDesc ) @@ -262,7 +262,7 @@ public class HeapMemoryTaskStorage implements TaskStorage List list = createdDateDesc .sortedCopy(tasks.values()) .stream() - .filter(taskStuff -> taskStuff.getStatus().isComplete()) + .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start)) .collect(Collectors.toList()); final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { @@ -283,7 +283,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } } - private List> getNRecentlyFinishedTaskInfo(int n, Ordering createdDateDesc) + private List> getNRecentlyCreatedAlreadyFinishedTaskInfo(int n, Ordering createdDateDesc) { giant.lock(); @@ -291,6 +291,7 @@ public class HeapMemoryTaskStorage implements TaskStorage List list = createdDateDesc .sortedCopy(tasks.values()) .stream() + .filter(taskStuff -> taskStuff.getStatus().isComplete()) .limit(n) .collect(Collectors.toList()); final ImmutableList.Builder> listBuilder = ImmutableList.builder(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 808fdb79729..f0d9d37167f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -208,15 +208,15 @@ public class MetadataTaskStorage implements TaskStorage } @Override - public List> getRecentlyFinishedTaskInfo( + public List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ) { return ImmutableList.copyOf( handler.getCompletedTaskInfo( - DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration), + DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), maxTaskStatuses, datasource ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index b2f55f0c9d4..1edd52c2c38 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -141,14 +141,14 @@ public interface TaskStorage * return nothing. * * @param maxTaskStatuses maxTaskStatuses - * @param duration duration + * @param durationBeforeNow duration * @param datasource datasource * * @return list of {@link TaskInfo} */ - List> getRecentlyFinishedTaskInfo( + List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index fd61752a7f2..6a12b40bc11 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -58,13 +58,13 @@ public class TaskStorageQueryAdapter return storage.getActiveTaskInfo(dataSource); } - public List> getRecentlyCompletedTaskInfo( + public List> getCompletedTaskInfoByCreatedTimeDuration( @Nullable Integer maxTaskStatuses, @Nullable Duration duration, @Nullable String dataSource ) { - return storage.getRecentlyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); + return storage.getRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); } public Optional getTask(final String taskid) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index f0748f9a3af..92f01ac5a6f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -627,7 +627,7 @@ public class OverlordResource public Response getTasks( @QueryParam("state") final String state, @QueryParam("datasource") final String dataSource, - @QueryParam("interval") final String interval, + @PathParam("createdTimeInterval") final String createdTimeInterval, @QueryParam("max") final Integer maxCompletedTasks, @QueryParam("type") final String type, @Context final HttpServletRequest req @@ -692,13 +692,13 @@ public class OverlordResource //checking for complete tasks first to avoid querying active tasks if user only wants complete tasks if (state == null || "complete".equals(StringUtils.toLowerCase(state))) { - Duration duration = null; - if (interval != null) { - final Interval theInterval = Intervals.of(interval.replace('_', '/')); - duration = theInterval.toDuration(); + Duration createdTimeDuration = null; + if (createdTimeInterval != null) { + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); + createdTimeDuration = theInterval.toDuration(); } final List> taskInfoList = - taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(maxCompletedTasks, duration, dataSource); + taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(maxCompletedTasks, createdTimeDuration, dataSource); final List completedTasks = taskInfoList.stream() .map(completeTaskTransformFunc::apply) .collect(Collectors.toList()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 7964c764069..e3885f75596 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -233,7 +233,7 @@ public class OverlordResourceTest new MockTaskRunnerWorkItem(tasksIds.get(1), null), new MockTaskRunnerWorkItem(tasksIds.get(2), null))); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -259,7 +259,7 @@ public class OverlordResourceTest ) ); EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); - Assert.assertTrue(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null).size() == 3); + Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3); Assert.assertTrue(taskRunner.getRunningTasks().size() == 3); List responseObjects = (List) overlordResource .getCompleteTasks(null, req).getEntity(); @@ -313,7 +313,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_5", @@ -403,7 +403,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, "allow")).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")).andStubReturn( ImmutableList.of( new TaskInfo( "id_5", @@ -667,7 +667,7 @@ public class OverlordResourceTest public void testGetTasksFilterCompleteState() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -707,7 +707,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); Duration duration = new Period("PT86400S").toStandardDuration(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, duration, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -747,7 +747,7 @@ public class OverlordResourceTest public void testGetNullCompleteTask() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1",