Add support to filter on datasource for active tasks (#5998)

* Add support to filter on datasource for active tasks

* Added datasource filter to sql query for active tasks
* Fixed unit tests

* Address PR comments
This commit is contained in:
Surekha 2018-07-19 16:33:46 -07:00 committed by Jonathan Wei
parent 4a2df2b23a
commit 414487a78e
12 changed files with 78 additions and 43 deletions

View File

@ -120,7 +120,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* *
* @return list of {@link TaskInfo} * @return list of {@link TaskInfo}
*/ */
List<TaskInfo<EntryType>> getActiveTaskInfo(); List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource);
/** /**
* Return createdDate and dataSource for the given id * Return createdDate and dataSource for the given id

View File

@ -170,7 +170,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
} }
@Override @Override
public List<TaskInfo<Task>> getActiveTaskInfo() public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
{ {
giant.lock(); giant.lock();

View File

@ -214,10 +214,10 @@ public class MetadataTaskStorage implements TaskStorage
} }
@Override @Override
public List<TaskInfo<Task>> getActiveTaskInfo() public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
{ {
return ImmutableList.copyOf( return ImmutableList.copyOf(
handler.getActiveTaskInfo() handler.getActiveTaskInfo(dataSource)
); );
} }

View File

@ -127,9 +127,11 @@ public interface TaskStorage
* Returns a list of currently running or pending tasks as stored in the storage facility as {@link TaskInfo}. No particular order * Returns a list of currently running or pending tasks as stored in the storage facility as {@link TaskInfo}. No particular order
* is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.
* *
* @param datasource datasource
*
* @return list of {@link TaskInfo} * @return list of {@link TaskInfo}
*/ */
List<TaskInfo<Task>> getActiveTaskInfo(); List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource);
/** /**
* Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage facility. No * Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage facility. No
@ -137,7 +139,8 @@ public interface TaskStorage
* No particular standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing. * No particular standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing.
* *
* @param maxTaskStatuses maxTaskStatuses * @param maxTaskStatuses maxTaskStatuses
* @param duration duration * @param duration duration
* @param datasource datasource
* *
* @return list of {@link TaskInfo} * @return list of {@link TaskInfo}
*/ */

View File

@ -55,9 +55,9 @@ public class TaskStorageQueryAdapter
return storage.getActiveTasks(); return storage.getActiveTasks();
} }
public List<TaskInfo<Task>> getActiveTaskInfo() public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
{ {
return storage.getActiveTaskInfo(); return storage.getActiveTaskInfo(dataSource);
} }
public List<TaskInfo<Task>> getRecentlyCompletedTaskInfo( public List<TaskInfo<Task>> getRecentlyCompletedTaskInfo(

View File

@ -629,10 +629,10 @@ public class OverlordResource
finalTaskList.addAll(completedTasks); finalTaskList.addAll(completedTasks);
} }
List<TaskInfo<Task>> allActiveTaskInfo = Lists.newArrayList(); final List<TaskInfo<Task>> allActiveTaskInfo;
final List<AnyTask> allActiveTasks = Lists.newArrayList(); final List<AnyTask> allActiveTasks = Lists.newArrayList();
if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) { if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) {
allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(); allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
for (final TaskInfo<Task> task : allActiveTaskInfo) { for (final TaskInfo<Task> task : allActiveTaskInfo) {
allActiveTasks.add( allActiveTasks.add(
new AnyTask( new AnyTask(

View File

@ -172,7 +172,7 @@ public class OverlordResourceTest
public void testSecuredGetWaitingTask() public void testSecuredGetWaitingTask()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo(
"id_1", "id_1",
@ -278,7 +278,7 @@ public class OverlordResourceTest
new MockTaskRunnerWorkItem(tasksIds.get(1), null) new MockTaskRunnerWorkItem(tasksIds.get(1), null)
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo(
"id_1", "id_1",
@ -337,7 +337,7 @@ public class OverlordResourceTest
) )
); );
//active tasks //active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo(
"id_1", "id_1",
@ -427,7 +427,7 @@ public class OverlordResourceTest
) )
); );
//active tasks //active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo(
"id_1", "id_1",
@ -491,7 +491,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//active tasks //active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo(
"id_1", "id_1",
@ -549,7 +549,7 @@ public class OverlordResourceTest
public void testGetTasksFilterRunningState() public void testGetTasksFilterRunningState()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo(
"id_1", "id_1",
@ -615,7 +615,7 @@ public class OverlordResourceTest
new MockTaskRunnerWorkItem(tasksIds.get(1), null) new MockTaskRunnerWorkItem(tasksIds.get(1), null)
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn( EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo(
"id_1", "id_1",

View File

@ -46,7 +46,7 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
@Override @Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery( protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
) )
{ {
String sql = StringUtils.format( String sql = StringUtils.format(
@ -59,7 +59,7 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
+ "FROM " + "FROM "
+ " %s " + " %s "
+ "WHERE " + "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(datasource) + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC", + "ORDER BY created_date DESC",
getEntryTable() getEntryTable()
); );
@ -72,8 +72,8 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
if (maxNumStatuses != null) { if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses); query = query.bind("n", maxNumStatuses);
} }
if (datasource != null) { if (dataSource != null) {
query = query.bind("ds", datasource); query = query.bind("ds", dataSource);
} }
return query; return query;
} }

View File

@ -46,7 +46,7 @@ public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, L
@Override @Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery( protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
) )
{ {
String sql = StringUtils.format( String sql = StringUtils.format(
@ -59,7 +59,7 @@ public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, L
+ "FROM " + "FROM "
+ " %s " + " %s "
+ "WHERE " + "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(datasource) + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC", + "ORDER BY created_date DESC",
getEntryTable() getEntryTable()
); );
@ -72,8 +72,8 @@ public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, L
if (maxNumStatuses != null) { if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses); query = query.bind("n", maxNumStatuses);
} }
if (datasource != null) { if (dataSource != null) {
query = query.bind("ds", datasource); query = query.bind("ds", dataSource);
} }
return query; return query;
} }

View File

@ -46,7 +46,7 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
@Override @Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery( protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
) )
{ {
String sql = StringUtils.format( String sql = StringUtils.format(
@ -59,7 +59,7 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
+ "FROM " + "FROM "
+ " %s " + " %s "
+ "WHERE " + "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(datasource) + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC", + "ORDER BY created_date DESC",
getEntryTable() getEntryTable()
); );
@ -73,8 +73,8 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
if (maxNumStatuses != null) { if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses); query = query.bind("n", maxNumStatuses);
} }
if (datasource != null) { if (dataSource != null) {
query = query.bind("ds", datasource); query = query.bind("ds", dataSource);
} }
return query; return query;
} }

View File

@ -333,7 +333,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
public List<TaskInfo<EntryType>> getCompletedTaskInfo( public List<TaskInfo<EntryType>> getCompletedTaskInfo(
DateTime timestamp, DateTime timestamp,
@Nullable Integer maxNumStatuses, @Nullable Integer maxNumStatuses,
@Nullable String datasource @Nullable String dataSource
) )
{ {
return getConnector().retryWithHandle( return getConnector().retryWithHandle(
@ -342,7 +342,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
handle, handle,
timestamp, timestamp,
maxNumStatuses, maxNumStatuses,
datasource dataSource
); );
return query.map(new TaskInfoMapper()).list(); return query.map(new TaskInfoMapper()).list();
} }
@ -350,20 +350,52 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
} }
@Override @Override
public List<TaskInfo<EntryType>> getActiveTaskInfo() public List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource)
{ {
return getConnector().retryWithHandle( return getConnector().retryWithHandle(
handle -> { handle -> {
return handle.createQuery( final Query<Map<String, Object>> query = createActiveStatusesQuery(
StringUtils.format( handle,
"SELECT id, status_payload, payload, datasource, created_date FROM %s WHERE active = TRUE ORDER BY created_date", dataSource
entryTable );
) return query.map(new TaskInfoMapper()).list();
).map(new TaskInfoMapper()).list();
} }
); );
} }
private Query<Map<String, Object>> createActiveStatusesQuery(Handle handle, @Nullable String dataSource)
{
String sql = StringUtils.format(
"SELECT "
+ " id, "
+ " status_payload, "
+ " payload, "
+ " datasource, "
+ " created_date "
+ "FROM "
+ " %s "
+ "WHERE "
+ getWhereClauseForActiveStatusesQuery(dataSource)
+ "ORDER BY created_date",
entryTable
);
Query<Map<String, Object>> query = handle.createQuery(sql);
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
}
private String getWhereClauseForActiveStatusesQuery(String dataSource)
{
String sql = StringUtils.format("active = TRUE ");
if (dataSource != null) {
sql += " AND datasource = :ds ";
}
return sql;
}
class TaskInfoMapper implements ResultSetMapper<TaskInfo<EntryType>> class TaskInfoMapper implements ResultSetMapper<TaskInfo<EntryType>>
{ {
@Override @Override
@ -401,7 +433,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
Handle handle, Handle handle,
DateTime timestamp, DateTime timestamp,
@Nullable Integer maxNumStatuses, @Nullable Integer maxNumStatuses,
@Nullable String datasource @Nullable String dataSource
); );
@Override @Override

View File

@ -46,7 +46,7 @@ public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogTyp
@Override @Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery( protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
) )
{ {
String sql = maxNumStatuses == null ? "SELECT " : "SELECT TOP (:n) "; String sql = maxNumStatuses == null ? "SELECT " : "SELECT TOP (:n) ";
@ -60,7 +60,7 @@ public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogTyp
+ "FROM " + "FROM "
+ " %s " + " %s "
+ "WHERE " + "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(datasource) + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC", + "ORDER BY created_date DESC",
getEntryTable() getEntryTable()
); );
@ -70,8 +70,8 @@ public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogTyp
if (maxNumStatuses != null) { if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses); query = query.bind("n", maxNumStatuses);
} }
if (datasource != null) { if (dataSource != null) {
query = query.bind("ds", datasource); query = query.bind("ds", dataSource);
} }
return query; return query;
} }