diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 006286ac841..51b38d02609 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -221,9 +221,9 @@ public class HeapMemoryTaskStorage implements TaskStorage } @Override - public List> getRecentlyFinishedTaskInfo( + public List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ) { @@ -240,18 +240,18 @@ public class HeapMemoryTaskStorage implements TaskStorage }.reverse(); return maxTaskStatuses == null ? - getRecentlyFinishedTaskInfoSince( - DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration), + getRecentlyCreatedAlreadyFinishedTaskInfoSince( + DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), createdDateDesc ) : - getNRecentlyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); + getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); } finally { giant.unlock(); } } - private List> getRecentlyFinishedTaskInfoSince( + private List> getRecentlyCreatedAlreadyFinishedTaskInfoSince( DateTime start, Ordering createdDateDesc ) @@ -262,7 +262,7 @@ public class HeapMemoryTaskStorage implements TaskStorage List list = createdDateDesc .sortedCopy(tasks.values()) .stream() - .filter(taskStuff -> taskStuff.getStatus().isComplete()) + .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start)) .collect(Collectors.toList()); final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { @@ -283,7 +283,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } } - private List> getNRecentlyFinishedTaskInfo(int n, Ordering createdDateDesc) + private List> getNRecentlyCreatedAlreadyFinishedTaskInfo(int n, Ordering createdDateDesc) { giant.lock(); @@ -291,6 +291,7 @@ public class HeapMemoryTaskStorage implements TaskStorage List list = createdDateDesc .sortedCopy(tasks.values()) .stream() + .filter(taskStuff -> taskStuff.getStatus().isComplete()) .limit(n) .collect(Collectors.toList()); final ImmutableList.Builder> listBuilder = ImmutableList.builder(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 808fdb79729..f0d9d37167f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -208,15 +208,15 @@ public class MetadataTaskStorage implements TaskStorage } @Override - public List> getRecentlyFinishedTaskInfo( + public List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ) { return ImmutableList.copyOf( handler.getCompletedTaskInfo( - DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration), + DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), maxTaskStatuses, datasource ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index b2f55f0c9d4..1edd52c2c38 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -141,14 +141,14 @@ public interface TaskStorage * return nothing. * * @param maxTaskStatuses maxTaskStatuses - * @param duration duration + * @param durationBeforeNow duration * @param datasource datasource * * @return list of {@link TaskInfo} */ - List> getRecentlyFinishedTaskInfo( + List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index fd61752a7f2..6a12b40bc11 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -58,13 +58,13 @@ public class TaskStorageQueryAdapter return storage.getActiveTaskInfo(dataSource); } - public List> getRecentlyCompletedTaskInfo( + public List> getCompletedTaskInfoByCreatedTimeDuration( @Nullable Integer maxTaskStatuses, @Nullable Duration duration, @Nullable String dataSource ) { - return storage.getRecentlyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); + return storage.getRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); } public Optional getTask(final String taskid) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index f0748f9a3af..92f01ac5a6f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -627,7 +627,7 @@ public class OverlordResource public Response getTasks( @QueryParam("state") final String state, @QueryParam("datasource") final String dataSource, - @QueryParam("interval") final String interval, + @PathParam("createdTimeInterval") final String createdTimeInterval, @QueryParam("max") final Integer maxCompletedTasks, @QueryParam("type") final String type, @Context final HttpServletRequest req @@ -692,13 +692,13 @@ public class OverlordResource //checking for complete tasks first to avoid querying active tasks if user only wants complete tasks if (state == null || "complete".equals(StringUtils.toLowerCase(state))) { - Duration duration = null; - if (interval != null) { - final Interval theInterval = Intervals.of(interval.replace('_', '/')); - duration = theInterval.toDuration(); + Duration createdTimeDuration = null; + if (createdTimeInterval != null) { + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); + createdTimeDuration = theInterval.toDuration(); } final List> taskInfoList = - taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(maxCompletedTasks, duration, dataSource); + taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(maxCompletedTasks, createdTimeDuration, dataSource); final List completedTasks = taskInfoList.stream() .map(completeTaskTransformFunc::apply) .collect(Collectors.toList()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 7964c764069..e3885f75596 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -233,7 +233,7 @@ public class OverlordResourceTest new MockTaskRunnerWorkItem(tasksIds.get(1), null), new MockTaskRunnerWorkItem(tasksIds.get(2), null))); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -259,7 +259,7 @@ public class OverlordResourceTest ) ); EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); - Assert.assertTrue(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null).size() == 3); + Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3); Assert.assertTrue(taskRunner.getRunningTasks().size() == 3); List responseObjects = (List) overlordResource .getCompleteTasks(null, req).getEntity(); @@ -313,7 +313,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_5", @@ -403,7 +403,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, "allow")).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")).andStubReturn( ImmutableList.of( new TaskInfo( "id_5", @@ -667,7 +667,7 @@ public class OverlordResourceTest public void testGetTasksFilterCompleteState() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -707,7 +707,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); Duration duration = new Period("PT86400S").toStandardDuration(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, duration, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -747,7 +747,7 @@ public class OverlordResourceTest public void testGetNullCompleteTask() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1",