From 414487a78e63f552c0c93cb711fe0c06ada0a3ac Mon Sep 17 00:00:00 2001 From: Surekha Date: Thu, 19 Jul 2018 16:33:46 -0700 Subject: [PATCH] Add support to filter on datasource for active tasks (#5998) * Add support to filter on datasource for active tasks * Added datasource filter to sql query for active tasks * Fixed unit tests * Address PR comments --- .../MetadataStorageActionHandler.java | 2 +- .../overlord/HeapMemoryTaskStorage.java | 2 +- .../overlord/MetadataTaskStorage.java | 4 +- .../druid/indexing/overlord/TaskStorage.java | 7 ++- .../overlord/TaskStorageQueryAdapter.java | 4 +- .../overlord/http/OverlordResource.java | 4 +- .../overlord/http/OverlordResourceTest.java | 14 ++--- .../DerbyMetadataStorageActionHandler.java | 8 +-- .../MySQLMetadataStorageActionHandler.java | 8 +-- ...ostgreSQLMetadataStorageActionHandler.java | 8 +-- .../SQLMetadataStorageActionHandler.java | 52 +++++++++++++++---- ...SQLServerMetadataStorageActionHandler.java | 8 +-- 12 files changed, 78 insertions(+), 43 deletions(-) diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java index 892bf761716..dca2dd006b9 100644 --- a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java +++ b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java @@ -120,7 +120,7 @@ public interface MetadataStorageActionHandler> getActiveTaskInfo(); + List> getActiveTaskInfo(@Nullable String dataSource); /** * Return createdDate and dataSource for the given id diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index 2e91c3f6026..341cf8ae41d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -170,7 +170,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } @Override - public List> getActiveTaskInfo() + public List> getActiveTaskInfo(@Nullable String dataSource) { giant.lock(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java index e8489cffdb1..b927497b022 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java @@ -214,10 +214,10 @@ public class MetadataTaskStorage implements TaskStorage } @Override - public List> getActiveTaskInfo() + public List> getActiveTaskInfo(@Nullable String dataSource) { return ImmutableList.copyOf( - handler.getActiveTaskInfo() + handler.getActiveTaskInfo(dataSource) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index da7a07ab76f..b24dd35a123 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -127,9 +127,11 @@ public interface TaskStorage * Returns a list of currently running or pending tasks as stored in the storage facility as {@link TaskInfo}. No particular order * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. * + * @param datasource datasource + * * @return list of {@link TaskInfo} */ - List> getActiveTaskInfo(); + List> getActiveTaskInfo(@Nullable String dataSource); /** * Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage facility. No @@ -137,7 +139,8 @@ public interface TaskStorage * No particular standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing. * * @param maxTaskStatuses maxTaskStatuses - * @param duration duration + * @param duration duration + * @param datasource datasource * * @return list of {@link TaskInfo} */ diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java index 4c3815cf9aa..c1cd2b3b7df 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -55,9 +55,9 @@ public class TaskStorageQueryAdapter return storage.getActiveTasks(); } - public List> getActiveTaskInfo() + public List> getActiveTaskInfo(@Nullable String dataSource) { - return storage.getActiveTaskInfo(); + return storage.getActiveTaskInfo(dataSource); } public List> getRecentlyCompletedTaskInfo( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 895ea8ff3d5..68b5ff69167 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -629,10 +629,10 @@ public class OverlordResource finalTaskList.addAll(completedTasks); } - List> allActiveTaskInfo = Lists.newArrayList(); + final List> allActiveTaskInfo; final List allActiveTasks = Lists.newArrayList(); if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) { - allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(); + allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(dataSource); for (final TaskInfo task : allActiveTaskInfo) { allActiveTasks.add( new AnyTask( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index a8ad29c8d35..3a02c8bee2d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -172,7 +172,7 @@ public class OverlordResourceTest public void testSecuredGetWaitingTask() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -278,7 +278,7 @@ public class OverlordResourceTest new MockTaskRunnerWorkItem(tasksIds.get(1), null) ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -337,7 +337,7 @@ public class OverlordResourceTest ) ); //active tasks - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -427,7 +427,7 @@ public class OverlordResourceTest ) ); //active tasks - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -491,7 +491,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); //active tasks - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -549,7 +549,7 @@ public class OverlordResourceTest public void testGetTasksFilterRunningState() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -615,7 +615,7 @@ public class OverlordResourceTest new MockTaskRunnerWorkItem(tasksIds.get(1), null) ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java index a6453327cd6..ce05ec2c903 100644 --- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -46,7 +46,7 @@ public class DerbyMetadataStorageActionHandler> createInactiveStatusesSinceQuery( - Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource + Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource ) { String sql = StringUtils.format( @@ -59,7 +59,7 @@ public class DerbyMetadataStorageActionHandler> createInactiveStatusesSinceQuery( - Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource + Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource ) { String sql = StringUtils.format( @@ -59,7 +59,7 @@ public class MySQLMetadataStorageActionHandler> createInactiveStatusesSinceQuery( - Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource + Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource ) { String sql = StringUtils.format( @@ -59,7 +59,7 @@ public class PostgreSQLMetadataStorageActionHandler> getCompletedTaskInfo( DateTime timestamp, @Nullable Integer maxNumStatuses, - @Nullable String datasource + @Nullable String dataSource ) { return getConnector().retryWithHandle( @@ -342,7 +342,7 @@ public abstract class SQLMetadataStorageActionHandler> getActiveTaskInfo() + public List> getActiveTaskInfo(@Nullable String dataSource) { return getConnector().retryWithHandle( handle -> { - return handle.createQuery( - StringUtils.format( - "SELECT id, status_payload, payload, datasource, created_date FROM %s WHERE active = TRUE ORDER BY created_date", - entryTable - ) - ).map(new TaskInfoMapper()).list(); + final Query> query = createActiveStatusesQuery( + handle, + dataSource + ); + return query.map(new TaskInfoMapper()).list(); } ); } + private Query> createActiveStatusesQuery(Handle handle, @Nullable String dataSource) + { + String sql = StringUtils.format( + "SELECT " + + " id, " + + " status_payload, " + + " payload, " + + " datasource, " + + " created_date " + + "FROM " + + " %s " + + "WHERE " + + getWhereClauseForActiveStatusesQuery(dataSource) + + "ORDER BY created_date", + entryTable + ); + + Query> query = handle.createQuery(sql); + if (dataSource != null) { + query = query.bind("ds", dataSource); + } + return query; + } + + private String getWhereClauseForActiveStatusesQuery(String dataSource) + { + String sql = StringUtils.format("active = TRUE "); + if (dataSource != null) { + sql += " AND datasource = :ds "; + } + return sql; + } + class TaskInfoMapper implements ResultSetMapper> { @Override @@ -401,7 +433,7 @@ public abstract class SQLMetadataStorageActionHandler> createInactiveStatusesSinceQuery( - Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource + Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource ) { String sql = maxNumStatuses == null ? "SELECT " : "SELECT TOP (:n) "; @@ -60,7 +60,7 @@ public class SQLServerMetadataStorageActionHandler