Fix a race condition in the '/tasks' Overlord API (#12330)

* finds complete and active tasks from the same snapshot

* overlord resource

* unit test

* integration test

* javadoc and cleanup

* more cleanup

* fix test and add more
This commit is contained in:
Jihoon Son 2022-03-17 10:47:45 +09:00 committed by GitHub
parent d745d0b338
commit 5e23674fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1270 additions and 853 deletions

View File

@ -25,6 +25,8 @@ package org.apache.druid.indexer;
*/ */
public enum RunnerTaskState public enum RunnerTaskState
{ {
// Waiting tasks are not tracked.
// Instead, they are computed by (all tasks in metadata store - all tasks in taskRunner).
WAITING, WAITING,
PENDING, PENDING,
RUNNING, RUNNING,

View File

@ -77,4 +77,3 @@ public class TaskInfo<EntryType, StatusType>
return task; return task;
} }
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -66,10 +67,10 @@ public class TaskStatus
} }
/** /**
* This method is deprecated because it does not handle the error message of failed task status properly. * This method is deprecated for production because it does not handle the error message of failed task status properly.
* Use {@link #success(String)} or {@link #failure(String, String)} instead. * Use {@link #success(String)} or {@link #failure(String, String)} instead.
*/ */
@Deprecated @VisibleForTesting
public static TaskStatus fromCode(String taskId, TaskState code) public static TaskStatus fromCode(String taskId, TaskState code)
{ {
return new TaskStatus(taskId, code, -1, null, null); return new TaskStatus(taskId, code, -1, null, null);

View File

@ -48,7 +48,7 @@ public class TaskStatusPlus
public TaskStatusPlus( public TaskStatusPlus(
String id, String id,
@Nullable String groupId, @Nullable String groupId,
String type, // nullable for backward compatibility @Nullable String type, // nullable for backward compatibility
DateTime createdTime, DateTime createdTime,
DateTime queueInsertionTime, DateTime queueInsertionTime,
@Nullable TaskState statusCode, @Nullable TaskState statusCode,

View File

@ -21,10 +21,12 @@ package org.apache.druid.metadata;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -82,26 +84,28 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
TaskInfo<EntryType, StatusType> getTaskInfo(String entryId); TaskInfo<EntryType, StatusType> getTaskInfo(String entryId);
/** /**
* Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries * Returns a list of {@link TaskInfo} from metadata store that matches to the given filters.
* created on or later than the given timestamp
* *
* @param timestamp timestamp * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* @param maxNumStatuses maxNumStatuses * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if there are more than one lookup is given.
* *
* @return list of {@link TaskInfo} * @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/ */
List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo( List<TaskInfo<EntryType, StatusType>> getTaskInfos(
DateTime timestamp, Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable Integer maxNumStatuses,
@Nullable String datasource @Nullable String datasource
); );
/** default List<TaskInfo<EntryType, StatusType>> getTaskInfos(
* Return {@link TaskInfo} objects for all active entries TaskLookup taskLookup,
* @Nullable String datasource
* @return list of {@link TaskInfo} )
*/ {
List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource); return getTaskInfos(Collections.singletonMap(taskLookup.getType(), taskLookup), datasource);
}
/** /**
* Add a lock to the given entry * Add a lock to the given entry

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Lookup types and parameters for task lookups in the metadata store.
*/
public interface TaskLookup
{
/**
* Task state in the metadata store.
* Complete tasks are the tasks that have been either succeeded or failed.
* Active tasks are the tasks that are not complete tasks.
*/
enum TaskLookupType
{
ACTIVE,
COMPLETE
}
TaskLookupType getType();
/**
* Task lookup for complete tasks. It includes optional filters for task lookups.
* When the filters are given, the task lookup returns only the tasks that satisfy all filters.
*/
class CompleteTaskLookup implements TaskLookup
{
/**
* Limits the number of taskStatuses to return.
*/
@Nullable
private final Integer maxTaskStatuses;
/**
* Returns only the tasks created prior to the given timestamp.
*/
@Nullable
private final DateTime tasksCreatedPriorTo;
public static CompleteTaskLookup of(
@Nullable Integer maxTaskStatuses,
@Nullable Duration durationBeforeNow
)
{
return new CompleteTaskLookup(
maxTaskStatuses,
durationBeforeNow == null ? null : computeTimestampPriorToNow(durationBeforeNow)
);
}
@VisibleForTesting
public static CompleteTaskLookup withTasksCreatedPriorTo(
@Nullable Integer maxTaskStatuses,
@Nullable DateTime tasksCreatedPriorTo
)
{
return new CompleteTaskLookup(maxTaskStatuses, tasksCreatedPriorTo);
}
private CompleteTaskLookup(
@Nullable Integer maxTaskStatuses,
@Nullable DateTime tasksCreatedPriorTo
)
{
this.maxTaskStatuses = maxTaskStatuses;
this.tasksCreatedPriorTo = tasksCreatedPriorTo;
}
public boolean hasTaskCreatedTimeFilter()
{
return tasksCreatedPriorTo != null;
}
public CompleteTaskLookup withDurationBeforeNow(Duration durationBeforeNow)
{
return CompleteTaskLookup.of(
maxTaskStatuses,
Preconditions.checkNotNull(durationBeforeNow, "durationBeforeNow")
);
}
private static DateTime computeTimestampPriorToNow(Duration durationBeforeNow)
{
return DateTimes
.nowUtc()
.minus(durationBeforeNow);
}
public DateTime getTasksCreatedPriorTo()
{
assert tasksCreatedPriorTo != null;
return tasksCreatedPriorTo;
}
@Nullable
public Integer getMaxTaskStatuses()
{
return maxTaskStatuses;
}
@Override
public TaskLookupType getType()
{
return TaskLookupType.COMPLETE;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompleteTaskLookup that = (CompleteTaskLookup) o;
return Objects.equals(maxTaskStatuses, that.maxTaskStatuses)
&& Objects.equals(tasksCreatedPriorTo, that.tasksCreatedPriorTo);
}
@Override
public int hashCode()
{
return Objects.hash(maxTaskStatuses, tasksCreatedPriorTo);
}
}
class ActiveTaskLookup implements TaskLookup
{
private static final ActiveTaskLookup INSTANCE = new ActiveTaskLookup();
public static ActiveTaskLookup getInstance()
{
return INSTANCE;
}
private ActiveTaskLookup()
{
}
@Override
public TaskLookupType getType()
{
return TaskLookupType.ACTIVE;
}
@Override
public int hashCode()
{
return 0;
}
@Override
public boolean equals(Object obj)
{
return obj instanceof ActiveTaskLookup;
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
@RunWith(Enclosed.class)
public class TaskLookupTest
{
public static class CompleteTaskLookupTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(CompleteTaskLookup.class).usingGetClass().verify();
}
@Test
public void testGetType()
{
Assert.assertEquals(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null).getType());
}
@Test
public void testNullParams()
{
final CompleteTaskLookup lookup = CompleteTaskLookup.of(null, null);
Assert.assertNull(lookup.getMaxTaskStatuses());
Assert.assertFalse(lookup.hasTaskCreatedTimeFilter());
Assert.assertThrows(AssertionError.class, lookup::getTasksCreatedPriorTo);
}
@Test
public void testWithDurationBeforeNow()
{
final Duration duration = new Period("P1D").toStandardDuration();
final DateTime timestampBeforeLookupCreated = DateTimes.nowUtc().minus(duration);
final CompleteTaskLookup lookup = CompleteTaskLookup
.of(null, null)
.withDurationBeforeNow(duration);
Assert.assertNull(lookup.getMaxTaskStatuses());
Assert.assertTrue(
timestampBeforeLookupCreated.isEqual(lookup.getTasksCreatedPriorTo())
|| timestampBeforeLookupCreated.isBefore(lookup.getTasksCreatedPriorTo())
);
}
@Test
public void testNonNullParams()
{
final Duration duration = new Period("P1D").toStandardDuration();
final DateTime timestampBeforeLookupCreated = DateTimes.nowUtc().minus(duration);
final CompleteTaskLookup lookup = CompleteTaskLookup.of(3, duration);
Assert.assertNotNull(lookup.getMaxTaskStatuses());
Assert.assertEquals(3, lookup.getMaxTaskStatuses().intValue());
Assert.assertTrue(lookup.hasTaskCreatedTimeFilter());
Assert.assertTrue(
timestampBeforeLookupCreated.isEqual(lookup.getTasksCreatedPriorTo())
|| timestampBeforeLookupCreated.isBefore(lookup.getTasksCreatedPriorTo())
);
}
}
public static class ActiveTaskLookupTest
{
@Test
public void testSingleton()
{
final ActiveTaskLookup lookup1 = ActiveTaskLookup.getInstance();
final ActiveTaskLookup lookup2 = ActiveTaskLookup.getInstance();
Assert.assertEquals(lookup1, lookup2);
Assert.assertSame(lookup1, lookup2);
}
@Test
public void testGetType()
{
Assert.assertEquals(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance().getType());
}
}
}

View File

@ -37,15 +37,19 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* Implements an in-heap TaskStorage facility, with no persistence across restarts. This class * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class
@ -166,23 +170,21 @@ public class HeapMemoryTaskStorage implements TaskStorage
return listBuilder.build(); return listBuilder.build();
} }
@Override
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource) public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{ {
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder(); final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : tasks.values()) { for (final TaskStuff taskStuff : tasks.values()) {
if (taskStuff.getStatus().isRunnable()) { if (taskStuff.getStatus().isRunnable()) {
TaskInfo t = TaskStuff.toTaskInfo(taskStuff); if (dataSource == null || dataSource.equals(taskStuff.getDataSource())) {
listBuilder.add(t); listBuilder.add(TaskStuff.toTaskInfo(taskStuff));
}
} }
} }
return listBuilder.build(); return listBuilder.build();
} }
@Override
public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo( public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
@Nullable Integer maxTaskStatuses, CompleteTaskLookup taskLookup,
@Nullable Duration durationBeforeNow,
@Nullable String datasource @Nullable String datasource
) )
{ {
@ -195,39 +197,57 @@ public class HeapMemoryTaskStorage implements TaskStorage
} }
}.reverse(); }.reverse();
return maxTaskStatuses == null ? return getRecentlyCreatedAlreadyFinishedTaskInfoSince(
getRecentlyCreatedAlreadyFinishedTaskInfoSince( taskLookup.getTasksCreatedPriorTo(),
DateTimes.nowUtc() taskLookup.getMaxTaskStatuses(),
.minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), createdDateDesc
createdDateDesc );
) : }
getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
/**
* NOTE: This method is racy as it searches for complete tasks and active tasks separately outside a lock.
* This method should be used only for testing.
*/
@Override
public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
final List<TaskInfo<Task, TaskStatus>> tasks = new ArrayList<>();
taskLookups.forEach((type, lookup) -> {
if (type == TaskLookupType.COMPLETE) {
CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) lookup;
tasks.addAll(
getRecentlyCreatedAlreadyFinishedTaskInfo(
completeTaskLookup.hasTaskCreatedTimeFilter()
? completeTaskLookup
: completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()),
datasource
)
);
} else {
tasks.addAll(getActiveTaskInfo(datasource));
}
});
return tasks;
} }
private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince( private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
DateTime start, DateTime start,
@Nullable Integer n,
Ordering<TaskStuff> createdDateDesc Ordering<TaskStuff> createdDateDesc
) )
{ {
List<TaskInfo<Task, TaskStatus>> list = tasks.values() Stream<TaskStuff> stream = tasks
.values()
.stream() .stream()
.filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().isAfter(start)) .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().isAfter(start))
.sorted(createdDateDesc) .sorted(createdDateDesc);
.map(TaskStuff::toTaskInfo) if (n != null) {
.collect(Collectors.toList()); stream = stream.limit(n);
return Collections.unmodifiableList(list); }
} List<TaskInfo<Task, TaskStatus>> list = stream
private List<TaskInfo<Task, TaskStatus>> getNRecentlyCreatedAlreadyFinishedTaskInfo(
int n,
Ordering<TaskStuff> createdDateDesc
)
{
List<TaskInfo<Task, TaskStatus>> list = tasks.values()
.stream()
.filter(taskStuff -> taskStuff.getStatus().isComplete())
.sorted(createdDateDesc)
.limit(n)
.map(TaskStuff::toTaskInfo) .map(TaskStuff::toTaskInfo)
.collect(Collectors.toList()); .collect(Collectors.toList());
return Collections.unmodifiableList(list); return Collections.unmodifiableList(list);

View File

@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
@ -43,11 +44,16 @@ import org.apache.druid.metadata.MetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.MetadataStorageActionHandlerTypes; import org.apache.druid.metadata.MetadataStorageActionHandlerTypes;
import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.Duration; import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class MetadataTaskStorage implements TaskStorage public class MetadataTaskStorage implements TaskStorage
@ -191,7 +197,7 @@ public class MetadataTaskStorage implements TaskStorage
{ {
// filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module
// and don't know what to do with the payload, so we won't be able to make use of it anyway // and don't know what to do with the payload, so we won't be able to make use of it anyway
return handler.getActiveTaskInfo(null) return handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null)
.stream() .stream()
.filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null)
.map(TaskInfo::getTask) .map(TaskInfo::getTask)
@ -201,7 +207,10 @@ public class MetadataTaskStorage implements TaskStorage
@Override @Override
public List<Task> getActiveTasksByDatasource(String datasource) public List<Task> getActiveTasksByDatasource(String datasource)
{ {
List<TaskInfo<Task, TaskStatus>> activeTaskInfos = handler.getActiveTaskInfo(datasource); List<TaskInfo<Task, TaskStatus>> activeTaskInfos = handler.getTaskInfos(
Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
datasource
);
ImmutableList.Builder<Task> tasksBuilder = ImmutableList.builder(); ImmutableList.Builder<Task> tasksBuilder = ImmutableList.builder();
for (TaskInfo<Task, TaskStatus> taskInfo : activeTaskInfos) { for (TaskInfo<Task, TaskStatus> taskInfo : activeTaskInfos) {
if (taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) { if (taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) {
@ -212,28 +221,26 @@ public class MetadataTaskStorage implements TaskStorage
} }
@Override @Override
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource) public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
{ Map<TaskLookupType, TaskLookup> taskLookups,
return ImmutableList.copyOf(
handler.getActiveTaskInfo(dataSource)
);
}
@Override
public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
@Nullable Integer maxTaskStatuses,
@Nullable Duration durationBeforeNow,
@Nullable String datasource @Nullable String datasource
) )
{ {
return ImmutableList.copyOf( Map<TaskLookupType, TaskLookup> theTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size());
handler.getCompletedTaskInfo( for (Entry<TaskLookupType, TaskLookup> entry : taskLookups.entrySet()) {
DateTimes.nowUtc() if (entry.getKey() == TaskLookupType.COMPLETE) {
.minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue();
maxTaskStatuses, theTaskLookups.put(
datasource entry.getKey(),
) completeTaskLookup.hasTaskCreatedTimeFilter()
); ? completeTaskLookup
: completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold())
);
} else {
theTaskLookups.put(entry.getKey(), entry.getValue());
}
}
return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource));
} }
@Override @Override

View File

@ -26,10 +26,13 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.EntryExistsException;
import org.joda.time.Duration; import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
public interface TaskStorage public interface TaskStorage
{ {
@ -148,31 +151,28 @@ public interface TaskStorage
List<Task> getActiveTasksByDatasource(String datasource); List<Task> getActiveTasksByDatasource(String datasource);
/** /**
* Returns a list of currently running or pending tasks as stored in the storage facility as {@link TaskInfo}. No * Returns a list of tasks stored in the storage facility as {@link TaskInfo}. No
* particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. * particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.
* *
* @return list of {@link TaskInfo} * The returned list can contain active tasks and complete tasks depending on the {@code taskLookups} parameter.
*/ * See {@link TaskLookup} for more details of active and complete tasks.
List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource);
/**
* Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage
* facility. No particular order is guaranteed, but implementations are encouraged to return tasks in descending order
* of creation. No particular standard of "recent" is guaranteed, and in fact, this method is permitted to simply
* return nothing.
* *
* @param maxTaskStatuses maxTaskStatuses * @param taskLookups lookup types and filters
* @param durationBeforeNow duration * @param datasource datasource filter
* @param datasource datasource
*
* @return list of {@link TaskInfo}
*/ */
List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo( List<TaskInfo<Task, TaskStatus>> getTaskInfos(
@Nullable Integer maxTaskStatuses, Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable Duration durationBeforeNow,
@Nullable String datasource @Nullable String datasource
); );
default List<TaskInfo<Task, TaskStatus>> getTaskInfos(
TaskLookup taskLookup,
@Nullable String datasource
)
{
return getTaskInfos(Collections.singletonMap(taskLookup.getType(), taskLookup), datasource);
}
/** /**
* Returns a list of locks for a particular task. * Returns a list of locks for a particular task.
* *

View File

@ -27,6 +27,10 @@ import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -75,7 +79,10 @@ public class TaskStorageQueryAdapter
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource) public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{ {
return storage.getActiveTaskInfo(dataSource); return storage.getTaskInfos(
ActiveTaskLookup.getInstance(),
dataSource
);
} }
public List<TaskInfo<Task, TaskStatus>> getCompletedTaskInfoByCreatedTimeDuration( public List<TaskInfo<Task, TaskStatus>> getCompletedTaskInfoByCreatedTimeDuration(
@ -84,7 +91,15 @@ public class TaskStorageQueryAdapter
@Nullable String dataSource @Nullable String dataSource
) )
{ {
return storage.getRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); return storage.getTaskInfos(CompleteTaskLookup.of(maxTaskStatuses, duration), dataSource);
}
public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String dataSource
)
{
return storage.getTaskInfos(taskLookups, dataSource);
} }
public Optional<Task> getTask(final String taskid) public Optional<Task> getTask(final String taskid)

View File

@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters; import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditEntry;
@ -39,7 +37,6 @@ import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -60,10 +57,15 @@ import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.apache.druid.server.http.HttpMediaType; import org.apache.druid.server.http.HttpMediaType;
import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.http.security.DatasourceResourceFilter;
@ -78,7 +80,6 @@ import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType; import org.apache.druid.server.security.ResourceType;
import org.apache.druid.tasklogs.TaskLogStreamer; import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -108,6 +109,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* *
@ -130,6 +132,24 @@ public class OverlordResource
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null; private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
private enum TaskStateLookup
{
ALL,
WAITING,
PENDING,
RUNNING,
COMPLETE;
private static TaskStateLookup fromString(@Nullable String state)
{
if (state == null) {
return ALL;
} else {
return TaskStateLookup.valueOf(StringUtils.toUpperCase(state));
}
}
}
@Inject @Inject
public OverlordResource( public OverlordResource(
TaskMaster taskMaster, TaskMaster taskMaster,
@ -665,100 +685,229 @@ public class OverlordResource
); );
} }
} }
List<TaskStatusPlus> finalTaskList = new ArrayList<>();
Function<AnyTask, TaskStatusPlus> activeTaskTransformFunc = workItem -> new TaskStatusPlus( return asLeaderWith(
workItem.getTaskId(), taskMaster.getTaskRunner(),
workItem.getTaskGroupId(), taskRunner -> {
workItem.getTaskType(), final List<TaskStatusPlus> authorizedList = securedTaskStatusPlus(
workItem.getCreatedTime(), getTasks(
workItem.getQueueInsertionTime(), taskRunner,
workItem.getTaskState(), TaskStateLookup.fromString(state),
workItem.getRunnerTaskState(), dataSource,
null, createdTimeInterval,
workItem.getLocation(), maxCompletedTasks,
workItem.getDataSource(), type
null ),
dataSource,
req
);
return Response.ok(authorizedList).build();
}
); );
}
Function<TaskInfo<Task, TaskStatus>, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus( private List<TaskStatusPlus> getTasks(
taskInfo.getId(), TaskRunner taskRunner,
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), TaskStateLookup state,
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), @Nullable String dataSource,
taskInfo.getCreatedTime(), @Nullable String createdTimeInterval,
// Would be nice to include the real queue insertion time, but the @Nullable Integer maxCompletedTasks,
// TaskStorage API doesn't yet allow it. @Nullable String type
DateTimes.EPOCH, )
taskInfo.getStatus().getStatusCode(), {
RunnerTaskState.NONE, final Duration createdTimeDuration;
taskInfo.getStatus().getDuration(), if (createdTimeInterval != null) {
taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(), final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/"));
taskInfo.getDataSource(), createdTimeDuration = theInterval.toDuration();
taskInfo.getStatus().getErrorMsg() } else {
); createdTimeDuration = null;
//checking for complete tasks first to avoid querying active tasks if user only wants complete tasks
if (state == null || "complete".equals(StringUtils.toLowerCase(state))) {
Duration createdTimeDuration = null;
if (createdTimeInterval != null) {
final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/"));
createdTimeDuration = theInterval.toDuration();
}
final List<TaskInfo<Task, TaskStatus>> taskInfoList =
taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(maxCompletedTasks, createdTimeDuration, dataSource);
final List<TaskStatusPlus> completedTasks = taskInfoList.stream()
.map(completeTaskTransformFunc::apply)
.collect(Collectors.toList());
finalTaskList.addAll(completedTasks);
} }
final List<TaskInfo<Task, TaskStatus>> allActiveTaskInfo; // Ideally, snapshotting in taskStorage and taskRunner should be done atomically,
final List<AnyTask> allActiveTasks = new ArrayList<>(); // but there is no way to do it today.
if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) { // Instead, we first gets a snapshot from taskStorage and then one from taskRunner.
allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(dataSource); // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process
for (final TaskInfo<Task, TaskStatus> task : allActiveTaskInfo) { // and use the snapshot from taskRunner as a reference for potential task state updates happened
allActiveTasks.add( // after the first snapshotting.
new AnyTask( Stream<TaskInfo<Task, TaskStatus>> taskInfoStreamFromTaskStorage = getTaskInfoStreamFromTaskStorage(
task.getId(), state,
task.getTask() == null ? null : task.getTask().getGroupId(),
task.getTask() == null ? null : task.getTask().getType(),
SettableFuture.create(),
task.getDataSource(),
null,
null,
task.getCreatedTime(),
DateTimes.EPOCH,
TaskLocation.unknown()
));
}
}
if (state == null || "waiting".equals(StringUtils.toLowerCase(state))) {
final List<AnyTask> waitingWorkItems = filterActiveTasks(RunnerTaskState.WAITING, allActiveTasks);
List<TaskStatusPlus> transformedWaitingList = waitingWorkItems.stream()
.map(activeTaskTransformFunc::apply)
.collect(Collectors.toList());
finalTaskList.addAll(transformedWaitingList);
}
if (state == null || "pending".equals(StringUtils.toLowerCase(state))) {
final List<AnyTask> pendingWorkItems = filterActiveTasks(RunnerTaskState.PENDING, allActiveTasks);
List<TaskStatusPlus> transformedPendingList = pendingWorkItems.stream()
.map(activeTaskTransformFunc::apply)
.collect(Collectors.toList());
finalTaskList.addAll(transformedPendingList);
}
if (state == null || "running".equals(StringUtils.toLowerCase(state))) {
final List<AnyTask> runningWorkItems = filterActiveTasks(RunnerTaskState.RUNNING, allActiveTasks);
List<TaskStatusPlus> transformedRunningList = runningWorkItems.stream()
.map(activeTaskTransformFunc::apply)
.collect(Collectors.toList());
finalTaskList.addAll(transformedRunningList);
}
final List<TaskStatusPlus> authorizedList = securedTaskStatusPlus(
finalTaskList,
dataSource, dataSource,
type, createdTimeDuration,
req maxCompletedTasks,
type
); );
return Response.ok(authorizedList).build(); final Map<String, ? extends TaskRunnerWorkItem> runnerWorkItems = getTaskRunnerWorkItems(
taskRunner,
state,
dataSource,
type
);
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
// We are interested in only those tasks which are in taskRunner.
taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage
.filter(info -> runnerWorkItems.containsKey(info.getId()));
}
final List<TaskInfo<Task, TaskStatus>> taskInfoFromTaskStorage = taskInfoStreamFromTaskStorage
.collect(Collectors.toList());
// Separate complete and active tasks from taskStorage.
// Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType.
final List<TaskInfo<Task, TaskStatus>> completeTaskInfoFromTaskStorage = new ArrayList<>();
final List<TaskInfo<Task, TaskStatus>> activeTaskInfoFromTaskStorage = new ArrayList<>();
for (TaskInfo<Task, TaskStatus> info : taskInfoFromTaskStorage) {
if (info.getStatus().isComplete()) {
completeTaskInfoFromTaskStorage.add(info);
} else {
activeTaskInfoFromTaskStorage.add(info);
}
}
final List<TaskStatusPlus> statuses = new ArrayList<>();
completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add(
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
DateTimes.EPOCH,
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.NONE,
taskInfo.getStatus().getDuration(),
taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
)
));
activeTaskInfoFromTaskStorage.forEach(taskInfo -> {
final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(taskInfo.getId());
if (runnerWorkItem == null) {
// a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner.
if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
statuses.add(
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
DateTimes.EPOCH,
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.WAITING,
taskInfo.getStatus().getDuration(),
taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
)
);
}
} else {
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
statuses.add(
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
runnerWorkItem.getCreatedTime(),
runnerWorkItem.getQueueInsertionTime(),
taskInfo.getStatus().getStatusCode(),
taskRunner.getRunnerTaskState(taskInfo.getId()), // this is racy for remoteTaskRunner
taskInfo.getStatus().getDuration(),
runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done.
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
)
);
}
}
});
return statuses;
}
private Stream<TaskInfo<Task, TaskStatus>> getTaskInfoStreamFromTaskStorage(
TaskStateLookup state,
@Nullable String dataSource,
Duration createdTimeDuration,
@Nullable Integer maxCompletedTasks,
@Nullable String type
)
{
final Map<TaskLookupType, TaskLookup> taskLookups;
switch (state) {
case ALL:
taskLookups = ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance(),
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration)
);
break;
case COMPLETE:
taskLookups = ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration)
);
break;
case WAITING:
case PENDING:
case RUNNING:
taskLookups = ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
);
break;
default:
throw new IAE("Unknown state: [%s]", state);
}
final Stream<TaskInfo<Task, TaskStatus>> taskInfoStreamFromTaskStorage = taskStorageQueryAdapter.getTaskInfos(
taskLookups,
dataSource
).stream();
if (type != null) {
return taskInfoStreamFromTaskStorage.filter(
info -> type.equals(info.getTask() == null ? null : info.getTask().getType())
);
} else {
return taskInfoStreamFromTaskStorage;
}
}
private Map<String, ? extends TaskRunnerWorkItem> getTaskRunnerWorkItems(
TaskRunner taskRunner,
TaskStateLookup state,
@Nullable String dataSource,
@Nullable String type
)
{
Stream<? extends TaskRunnerWorkItem> runnerWorkItemsStream;
switch (state) {
case ALL:
case WAITING:
// waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner)
runnerWorkItemsStream = taskRunner.getKnownTasks().stream();
break;
case PENDING:
runnerWorkItemsStream = taskRunner.getPendingTasks().stream();
break;
case RUNNING:
runnerWorkItemsStream = taskRunner.getRunningTasks().stream();
break;
case COMPLETE:
runnerWorkItemsStream = Stream.empty();
break;
default:
throw new IAE("Unknown state: [%s]", state);
}
if (dataSource != null) {
runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource()));
}
if (type != null) {
runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType()));
}
return runnerWorkItemsStream
.collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item));
} }
@DELETE @DELETE
@ -944,106 +1093,9 @@ public class OverlordResource
} }
} }
private List<AnyTask> filterActiveTasks(
RunnerTaskState state,
List<AnyTask> allTasks
)
{
//divide active tasks into 3 lists : running, pending, waiting
Optional<TaskRunner> taskRunnerOpt = taskMaster.getTaskRunner();
if (!taskRunnerOpt.isPresent()) {
throw new WebApplicationException(
Response.serverError().entity("No task runner found").build()
);
}
TaskRunner runner = taskRunnerOpt.get();
// the order of tasks below is waiting, pending, running to prevent
// skipping a task, it's the order in which tasks will change state
// if they do while this is code is executing, so a task might be
// counted twice but never skipped
if (RunnerTaskState.WAITING.equals(state)) {
Collection<? extends TaskRunnerWorkItem> runnersKnownTasks = runner.getKnownTasks();
Set<String> runnerKnownTaskIds = runnersKnownTasks
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet());
final List<AnyTask> waitingTasks = new ArrayList<>();
for (TaskRunnerWorkItem task : allTasks) {
if (!runnerKnownTaskIds.contains(task.getTaskId())) {
waitingTasks.add(((AnyTask) task).withTaskState(
TaskState.RUNNING,
RunnerTaskState.WAITING,
task.getCreatedTime(),
task.getQueueInsertionTime(),
task.getLocation()
));
}
}
return waitingTasks;
}
if (RunnerTaskState.PENDING.equals(state)) {
Collection<? extends TaskRunnerWorkItem> knownPendingTasks = runner.getPendingTasks();
Set<String> pendingTaskIds = knownPendingTasks
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet());
Map<String, TaskRunnerWorkItem> workItemIdMap = knownPendingTasks
.stream()
.collect(Collectors.toMap(
TaskRunnerWorkItem::getTaskId,
java.util.function.Function.identity(),
(previousWorkItem, newWorkItem) -> newWorkItem
));
final List<AnyTask> pendingTasks = new ArrayList<>();
for (TaskRunnerWorkItem task : allTasks) {
if (pendingTaskIds.contains(task.getTaskId())) {
pendingTasks.add(((AnyTask) task).withTaskState(
TaskState.RUNNING,
RunnerTaskState.PENDING,
workItemIdMap.get(task.getTaskId()).getCreatedTime(),
workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
workItemIdMap.get(task.getTaskId()).getLocation()
));
}
}
return pendingTasks;
}
if (RunnerTaskState.RUNNING.equals(state)) {
Collection<? extends TaskRunnerWorkItem> knownRunningTasks = runner.getRunningTasks();
Set<String> runningTaskIds = knownRunningTasks
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet());
Map<String, TaskRunnerWorkItem> workItemIdMap = knownRunningTasks
.stream()
.collect(Collectors.toMap(
TaskRunnerWorkItem::getTaskId,
java.util.function.Function.identity(),
(previousWorkItem, newWorkItem) -> newWorkItem
));
final List<AnyTask> runningTasks = new ArrayList<>();
for (TaskRunnerWorkItem task : allTasks) {
if (runningTaskIds.contains(task.getTaskId())) {
runningTasks.add(((AnyTask) task).withTaskState(
TaskState.RUNNING,
RunnerTaskState.RUNNING,
workItemIdMap.get(task.getTaskId()).getCreatedTime(),
workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
workItemIdMap.get(task.getTaskId()).getLocation()
));
}
}
return runningTasks;
}
return allTasks;
}
private List<TaskStatusPlus> securedTaskStatusPlus( private List<TaskStatusPlus> securedTaskStatusPlus(
List<TaskStatusPlus> collectionToFilter, List<TaskStatusPlus> collectionToFilter,
@Nullable String dataSource, @Nullable String dataSource,
@Nullable String type,
HttpServletRequest req HttpServletRequest req
) )
{ {
@ -1061,127 +1113,17 @@ public class OverlordResource
new ResourceAction(new Resource(taskDatasource, ResourceType.DATASOURCE), Action.READ) new ResourceAction(new Resource(taskDatasource, ResourceType.DATASOURCE), Action.READ)
); );
}; };
List<TaskStatusPlus> optionalTypeFilteredList = collectionToFilter;
if (type != null) {
optionalTypeFilteredList = collectionToFilter
.stream()
.filter(task -> type.equals(task.getType()))
.collect(Collectors.toList());
}
if (dataSource != null) { if (dataSource != null) {
//skip auth check here, as it's already done in getTasks //skip auth check here, as it's already done in getTasks
return optionalTypeFilteredList; return collectionToFilter;
} }
return Lists.newArrayList( return Lists.newArrayList(
AuthorizationUtils.filterAuthorizedResources( AuthorizationUtils.filterAuthorizedResources(
req, req,
optionalTypeFilteredList, collectionToFilter,
raGenerator, raGenerator,
authorizerMapper authorizerMapper
) )
); );
} }
private static class AnyTask extends TaskRunnerWorkItem
{
private final String taskGroupId;
private final String taskType;
private final String dataSource;
private final TaskState taskState;
private final RunnerTaskState runnerTaskState;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final TaskLocation taskLocation;
AnyTask(
String taskId,
String taskGroupId,
String taskType,
ListenableFuture<TaskStatus> result,
String dataSource,
TaskState state,
RunnerTaskState runnerState,
DateTime createdTime,
DateTime queueInsertionTime,
TaskLocation taskLocation
)
{
super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
this.taskGroupId = taskGroupId;
this.taskType = taskType;
this.dataSource = dataSource;
this.taskState = state;
this.runnerTaskState = runnerState;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.taskLocation = taskLocation;
}
@Override
public TaskLocation getLocation()
{
return taskLocation;
}
@Override
public String getTaskType()
{
return taskType;
}
@Override
public String getDataSource()
{
return dataSource;
}
public String getTaskGroupId()
{
return taskGroupId;
}
public TaskState getTaskState()
{
return taskState;
}
public RunnerTaskState getRunnerTaskState()
{
return runnerTaskState;
}
@Override
public DateTime getCreatedTime()
{
return createdTime;
}
@Override
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
}
public AnyTask withTaskState(
TaskState newTaskState,
RunnerTaskState runnerState,
DateTime createdTime,
DateTime queueInsertionTime,
TaskLocation taskLocation
)
{
return new AnyTask(
getTaskId(),
getTaskGroupId(),
getTaskType(),
getResult(),
getDataSource(),
newTaskState,
runnerState,
createdTime,
queueInsertionTime,
taskLocation
);
}
}
} }

View File

@ -25,7 +25,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskInfo;
@ -57,6 +56,9 @@ import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Action;
@ -228,33 +230,38 @@ public class OverlordResourceTest
public void testSecuredGetWaitingTask() public void testSecuredGetWaitingTask()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.running("id_1"),
"allow", "allow",
getTaskWithIdAndDatasource("id_1", "allow") getTaskWithIdAndDatasource("id_1", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.running("id_2"),
"allow", "allow",
getTaskWithIdAndDatasource("id_2", "allow") getTaskWithIdAndDatasource("id_2", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.running("id_3"),
"deny", "deny",
getTaskWithIdAndDatasource("id_3", "deny") getTaskWithIdAndDatasource("id_3", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_4", "id_4",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"), TaskStatus.running("id_4"),
"deny", "deny",
getTaskWithIdAndDatasource("id_4", "deny") getTaskWithIdAndDatasource("id_4", "deny")
) )
@ -263,8 +270,8 @@ public class OverlordResourceTest
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null), new MockTaskRunnerWorkItem("id_1"),
new MockTaskRunnerWorkItem("id_4", null) new MockTaskRunnerWorkItem("id_4")
) )
); );
@ -288,30 +295,27 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null),
new MockTaskRunnerWorkItem(tasksIds.get(2), null)
));
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.success("id_1"),
"deny", "deny",
getTaskWithIdAndDatasource("id_1", "deny") getTaskWithIdAndDatasource("id_1", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.success("id_2"),
"allow", "allow",
getTaskWithIdAndDatasource("id_2", "allow") getTaskWithIdAndDatasource("id_2", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.success("id_3"),
@ -328,8 +332,6 @@ public class OverlordResourceTest
req, req,
workerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
); );
Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3);
Assert.assertTrue(taskRunner.getRunningTasks().size() == 3);
List<TaskStatusPlus> responseObjects = (List) overlordResource List<TaskStatusPlus> responseObjects = (List) overlordResource
.getCompleteTasks(null, req).getEntity(); .getCompleteTasks(null, req).getEntity();
@ -345,28 +347,35 @@ public class OverlordResourceTest
List<String> tasksIds = ImmutableList.of("id_1", "id_2"); List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null), new MockTaskRunnerWorkItem(tasksIds.get(0)),
new MockTaskRunnerWorkItem(tasksIds.get(1), null) new MockTaskRunnerWorkItem(tasksIds.get(1))
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.running("id_1"),
"deny", "deny",
getTaskWithIdAndDatasource("id_1", "deny") getTaskWithIdAndDatasource("id_1", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.running("id_2"),
"allow", "allow",
getTaskWithIdAndDatasource("id_2", "allow") getTaskWithIdAndDatasource("id_2", "allow")
) )
) )
); );
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
@ -388,24 +397,54 @@ public class OverlordResourceTest
public void testGetTasks() public void testGetTasks()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//completed tasks EasyMock.expect(
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance(),
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null)
),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_5", "id_5",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"), TaskStatus.success("id_5"),
"deny", "deny",
getTaskWithIdAndDatasource("id_5", "deny") getTaskWithIdAndDatasource("id_5", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_6", "id_6",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"), TaskStatus.success("id_6"),
"allow", "allow",
getTaskWithIdAndDatasource("id_6", "allow") getTaskWithIdAndDatasource("id_6", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_7",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"),
"allow",
getTaskWithIdAndDatasource("id_7", "allow")
),
new TaskInfo<>(
"id_5",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"),
"deny",
getTaskWithIdAndDatasource("id_5", "deny")
),
new TaskInfo<>(
"id_6",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"),
"allow",
getTaskWithIdAndDatasource("id_6", "allow")
),
new TaskInfo<>(
"id_7", "id_7",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"), TaskStatus.success("id_7"),
@ -414,59 +453,14 @@ public class OverlordResourceTest
) )
) )
); );
//active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"deny",
getTaskWithIdAndDatasource("id_3", "deny")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null), new MockTaskRunnerWorkItem("id_1"),
new MockTaskRunnerWorkItem("id_4", null) new MockTaskRunnerWorkItem("id_4")
) )
).atLeastOnce(); ).atLeastOnce();
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null)
)
);
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
taskMaster, taskMaster,
@ -486,82 +480,75 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//completed tasks //completed tasks
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")) EasyMock.expect(
.andStubReturn( taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null),
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
),
"allow"
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_5", "id_5",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"), TaskStatus.success("id_5"),
"allow", "allow",
getTaskWithIdAndDatasource("id_5", "allow") getTaskWithIdAndDatasource("id_5", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_6", "id_6",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"), TaskStatus.success("id_6"),
"allow", "allow",
getTaskWithIdAndDatasource("id_6", "allow") getTaskWithIdAndDatasource("id_6", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_7", "id_7",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"), TaskStatus.success("id_7"),
"allow", "allow",
getTaskWithIdAndDatasource("id_7", "allow") getTaskWithIdAndDatasource("id_7", "allow")
),
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo<>(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"allow",
getTaskWithIdAndDatasource("id_4", "allow")
) )
) )
);
//active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"allow",
getTaskWithIdAndDatasource("id_4", "allow")
)
)
); );
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null), new MockTaskRunnerWorkItem("id_1"),
new MockTaskRunnerWorkItem("id_4", null) new MockTaskRunnerWorkItem("id_4")
) )
).atLeastOnce(); ).atLeastOnce();
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null)
)
);
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
taskMaster, taskMaster,
@ -576,7 +563,7 @@ public class OverlordResourceTest
.getEntity(); .getEntity();
Assert.assertEquals(7, responseObjects.size()); Assert.assertEquals(7, responseObjects.size());
Assert.assertEquals("id_5", responseObjects.get(0).getId()); Assert.assertEquals("id_5", responseObjects.get(0).getId());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource());
} }
@Test @Test
@ -584,33 +571,41 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//active tasks //active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.running("id_1"),
"allow", "allow",
getTaskWithIdAndDatasource("id_1", "allow") getTaskWithIdAndDatasource("id_1", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.running("id_2"),
"allow", "allow",
getTaskWithIdAndDatasource("id_2", "allow") getTaskWithIdAndDatasource("id_2", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.running("id_3"),
"deny", "deny",
getTaskWithIdAndDatasource("id_3", "deny") getTaskWithIdAndDatasource("id_3", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_4", "id_4",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"), TaskStatus.running("id_4"),
"deny", "deny",
getTaskWithIdAndDatasource("id_4", "deny") getTaskWithIdAndDatasource("id_4", "deny")
) )
@ -619,8 +614,8 @@ public class OverlordResourceTest
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null), new MockTaskRunnerWorkItem("id_1"),
new MockTaskRunnerWorkItem("id_4", null) new MockTaskRunnerWorkItem("id_4")
) )
); );
@ -649,33 +644,41 @@ public class OverlordResourceTest
public void testGetTasksFilterRunningState() public void testGetTasksFilterRunningState()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
),
"allow"
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.running("id_1"),
"allow", "allow",
getTaskWithIdAndDatasource("id_1", "allow") getTaskWithIdAndDatasource("id_1", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.running("id_2"),
"allow", "allow",
getTaskWithIdAndDatasource("id_2", "allow") getTaskWithIdAndDatasource("id_2", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.running("id_3"),
"allow", "allow",
getTaskWithIdAndDatasource("id_3", "allow") getTaskWithIdAndDatasource("id_3", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_4", "id_4",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"), TaskStatus.running("id_4"),
"deny", "deny",
getTaskWithIdAndDatasource("id_4", "deny") getTaskWithIdAndDatasource("id_4", "deny")
) )
@ -685,11 +688,12 @@ public class OverlordResourceTest
List<String> tasksIds = ImmutableList.of("id_1", "id_2"); List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null), new MockTaskRunnerWorkItem(tasksIds.get(0), "allow", "test"),
new MockTaskRunnerWorkItem(tasksIds.get(1), null) new MockTaskRunnerWorkItem(tasksIds.get(1), "allow", "test")
) )
); );
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andReturn(RunnerTaskState.RUNNING);
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
@ -706,8 +710,7 @@ public class OverlordResourceTest
Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals(tasksIds.get(0), responseObjects.get(0).getId()); Assert.assertEquals(tasksIds.get(0), responseObjects.get(0).getId());
String ds = responseObjects.get(0).getDataSource(); Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
} }
@Test @Test
@ -718,43 +721,52 @@ public class OverlordResourceTest
List<String> tasksIds = ImmutableList.of("id_1", "id_2"); List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null), new MockTaskRunnerWorkItem(tasksIds.get(0)),
new MockTaskRunnerWorkItem(tasksIds.get(1), null) new MockTaskRunnerWorkItem(tasksIds.get(1))
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.running("id_1"),
"deny", "deny",
getTaskWithIdAndDatasource("id_1", "deny") getTaskWithIdAndDatasource("id_1", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.running("id_2"),
"allow", "allow",
getTaskWithIdAndDatasource("id_2", "allow") getTaskWithIdAndDatasource("id_2", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.running("id_3"),
"allow", "allow",
getTaskWithIdAndDatasource("id_3", "allow") getTaskWithIdAndDatasource("id_3", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_4", "id_4",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"), TaskStatus.running("id_4"),
"deny", "deny",
getTaskWithIdAndDatasource("id_4", "deny") getTaskWithIdAndDatasource("id_4", "deny")
) )
) )
); );
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.PENDING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andStubReturn(RunnerTaskState.PENDING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_3")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
@ -771,31 +783,35 @@ public class OverlordResourceTest
Assert.assertEquals(1, responseObjects.size()); Assert.assertEquals(1, responseObjects.size());
Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId()); Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
String ds = responseObjects.get(0).getDataSource(); Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource());
//Assert.assertTrue("DataSource Check", "ds_test".equals(responseObjects.get(0).getDataSource()));
} }
@Test @Test
public void testGetTasksFilterCompleteState() public void testGetTasksFilterCompleteState()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.success("id_1"),
"allow", "allow",
getTaskWithIdAndDatasource("id_1", "allow") getTaskWithIdAndDatasource("id_1", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.success("id_2"),
"deny", "deny",
getTaskWithIdAndDatasource("id_2", "deny") getTaskWithIdAndDatasource("id_2", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.success("id_3"),
@ -824,26 +840,29 @@ public class OverlordResourceTest
public void testGetTasksFilterCompleteStateWithInterval() public void testGetTasksFilterCompleteStateWithInterval()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
Duration duration = new Period("PT86400S").toStandardDuration(); Duration duration = new Period("PT86400S").toStandardDuration();
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)) EasyMock.expect(
.andStubReturn( taskStorageQueryAdapter.getTaskInfos(
EasyMock.anyObject(),
EasyMock.anyObject()
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.success("id_1"),
"deny", "deny",
getTaskWithIdAndDatasource("id_1", "deny") getTaskWithIdAndDatasource("id_1", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.success("id_2"),
"allow", "allow",
getTaskWithIdAndDatasource("id_2", "allow") getTaskWithIdAndDatasource("id_2", "allow")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.success("id_3"),
@ -851,7 +870,7 @@ public class OverlordResourceTest
getTaskWithIdAndDatasource("id_3", "allow") getTaskWithIdAndDatasource("id_3", "allow")
) )
) )
); );
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
@ -878,38 +897,34 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(Users.WIKI_READER); expectAuthorizationTokenCheck(Users.WIKI_READER);
// Setup mocks to return completed, active, known, pending and running tasks // Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null),
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
createTaskInfo("id_5", Datasources.WIKIPEDIA), createTaskInfo("id_5", Datasources.WIKIPEDIA),
createTaskInfo("id_6", Datasources.BUZZFEED) createTaskInfo("id_6", Datasources.BUZZFEED),
) createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "test"),
); createTaskInfo("id_4", Datasources.BUZZFEED, TaskState.RUNNING, "test")
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
createTaskInfo("id_1", Datasources.WIKIPEDIA),
createTaskInfo("id_2", Datasources.BUZZFEED)
) )
); );
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null), new MockTaskRunnerWorkItem("id_1", Datasources.WIKIPEDIA, "test"),
new MockTaskRunnerWorkItem("id_4", null) new MockTaskRunnerWorkItem("id_4", Datasources.BUZZFEED, "test")
) )
).atLeastOnce(); ).atLeastOnce();
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn( EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andReturn(RunnerTaskState.PENDING);
ImmutableList.of( EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING);
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null)
)
);
// Replay all mocks // Replay all mocks
EasyMock.replay( EasyMock.replay(
@ -932,27 +947,68 @@ public class OverlordResourceTest
} }
@Test @Test
public void testGetTasksFilterByDatasourceRequiresReadAccess() public void testGetTasksFilterByTaskTypeRequiresDatasourceRead()
{ {
// Setup mocks for a user who has read access to "wikipedia" // Setup mocks for a user who has read access to "wikipedia"
// and no access to "buzzfeed" // and no access to "buzzfeed"
expectAuthorizationTokenCheck(Users.WIKI_READER); expectAuthorizationTokenCheck(Users.WIKI_READER);
// Setup mocks to return completed, active, known, pending and running tasks // Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null),
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
createTaskInfo("id_5", Datasources.WIKIPEDIA), createTaskInfo("id_5", Datasources.WIKIPEDIA),
createTaskInfo("id_6", Datasources.BUZZFEED) createTaskInfo("id_6", Datasources.BUZZFEED),
createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "to-return"),
createTaskInfo("id_4", Datasources.WIKIPEDIA, TaskState.RUNNING, "test")
) )
); );
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of( ImmutableList.of(
createTaskInfo("id_1", Datasources.WIKIPEDIA), new MockTaskRunnerWorkItem("id_1", Datasources.WIKIPEDIA, "to-return"),
createTaskInfo("id_2", Datasources.BUZZFEED) new MockTaskRunnerWorkItem("id_4", Datasources.WIKIPEDIA, "test")
) )
).atLeastOnce();
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING);
// Replay all mocks
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
); );
// Verify that only the tasks of read access datasource are returned
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, null, null, null, "to-return", req)
.getEntity();
Assert.assertEquals(1, responseObjects.size());
for (TaskStatusPlus taskStatus : responseObjects) {
Assert.assertEquals("to-return", taskStatus.getType());
}
}
@Test
public void testGetTasksFilterByDatasourceRequiresReadAccess()
{
// Setup mocks for a user who has read access to "wikipedia"
// and no access to "buzzfeed"
expectAuthorizationTokenCheck(Users.WIKI_READER);
// Replay all mocks // Replay all mocks
EasyMock.replay( EasyMock.replay(
taskRunner, taskRunner,
@ -972,23 +1028,31 @@ public class OverlordResourceTest
public void testGetNullCompleteTask() public void testGetNullCompleteTask()
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null)
),
null
)
).andStubReturn(
ImmutableList.of( ImmutableList.of(
new TaskInfo( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"), TaskStatus.success("id_1"),
"allow", "allow",
null null
), ),
new TaskInfo( new TaskInfo<>(
"id_2", "id_2",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"), TaskStatus.success("id_2"),
"deny", "deny",
getTaskWithIdAndDatasource("id_2", "deny") getTaskWithIdAndDatasource("id_2", "deny")
), ),
new TaskInfo( new TaskInfo<>(
"id_3", "id_3",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"), TaskStatus.success("id_3"),
@ -1661,13 +1725,18 @@ public class OverlordResourceTest
} }
private Task getTaskWithIdAndDatasource(String id, String datasource) private Task getTaskWithIdAndDatasource(String id, String datasource)
{
return getTaskWithIdAndDatasource(id, datasource, "test");
}
private Task getTaskWithIdAndDatasource(String id, String datasource, String taskType)
{ {
return new AbstractTask(id, datasource, null) return new AbstractTask(id, datasource, null)
{ {
@Override @Override
public String getType() public String getType()
{ {
return "test"; return taskType;
} }
@Override @Override
@ -1689,14 +1758,27 @@ public class OverlordResourceTest
}; };
} }
private TaskInfo<Task, TaskStatus> createTaskInfo(String taskId, String datasource) private TaskInfo<Task, TaskStatus> createTaskInfo(
String taskId,
String datasource
)
{
return createTaskInfo(taskId, datasource, TaskState.SUCCESS, "test");
}
private TaskInfo<Task, TaskStatus> createTaskInfo(
String taskId,
String datasource,
TaskState state,
String taskType
)
{ {
return new TaskInfo<>( return new TaskInfo<>(
taskId, taskId,
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success(taskId), TaskStatus.fromCode(taskId, state),
datasource, datasource,
getTaskWithIdAndDatasource(taskId, datasource) getTaskWithIdAndDatasource(taskId, datasource, taskType)
); );
} }
@ -1721,12 +1803,23 @@ public class OverlordResourceTest
private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem
{ {
private final String dataSource;
private final String type;
public MockTaskRunnerWorkItem(String taskId)
{
this(taskId, "ds_test", "test");
}
public MockTaskRunnerWorkItem( public MockTaskRunnerWorkItem(
String taskId, String taskId,
ListenableFuture<TaskStatus> result String dataSource,
String type
) )
{ {
super(taskId, result); super(taskId, null);
this.dataSource = dataSource;
this.type = type;
} }
@Override @Override
@ -1738,13 +1831,13 @@ public class OverlordResourceTest
@Override @Override
public String getTaskType() public String getTaskType()
{ {
return "test"; return type;
} }
@Override @Override
public String getDataSource() public String getDataSource()
{ {
return "ds_test"; return dataSource;
} }
} }

View File

@ -144,6 +144,11 @@ public class OverlordResourceTestClient
} }
} }
public List<TaskResponseObject> getAllTasks()
{
return getTasks("tasks");
}
public List<TaskResponseObject> getRunningTasks() public List<TaskResponseObject> getRunningTasks()
{ {
return getTasks("runningTasks"); return getTasks("runningTasks");

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.tests.query; package org.apache.druid.tests.api;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
@ -32,7 +32,7 @@ import java.util.function.Consumer;
@Test(groups = TestNGGroup.HTTP_ENDPOINT) @Test(groups = TestNGGroup.HTTP_ENDPOINT)
@Guice(moduleFactory = DruidTestModuleFactory.class) @Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITOverlordResourceTest public class ITOverlordResourceNotFoundTest
{ {
@Inject @Inject
protected OverlordResourceTestClient indexer; protected OverlordResourceTestClient indexer;

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.api;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
@Test(groups = TestNGGroup.HTTP_ENDPOINT)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITOverlordResourceTest
{
private static final String INGESTION_SPEC = "/api/overlord-resource-test-task.json";
@Inject
protected OverlordResourceTestClient indexer;
@Test
public void testGetAllTasks() throws IOException
{
final String taskSpec = AbstractIndexerTest.getResourceAsString(INGESTION_SPEC);
final String taskId = indexer.submitTask(taskSpec);
ITRetryUtil.retryUntil(
() -> {
final List<TaskResponseObject> tasks = indexer.getAllTasks();
final TaskResponseObject taskStatus = tasks
.stream()
.filter(task -> taskId.equals(task.getId()))
.findAny()
.orElseThrow(() -> new ISE("Cannot find task[%s]", taskId));
TaskState status = taskStatus.getStatus();
if (status == TaskState.FAILED) {
throw new ISE("Task[%s] FAILED", taskId);
}
return status == TaskState.SUCCESS;
},
true,
ITRetryUtil.DEFAULT_RETRY_SLEEP,
ITRetryUtil.DEFAULT_RETRY_COUNT,
taskId
);
}
}

View File

@ -0,0 +1,87 @@
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "it-overlord-resource-test",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"name": "thetaSketch",
"type": "thetaSketch",
"fieldName": "user"
},
{
"name": "quantilesDoublesSketch",
"type": "quantilesDoublesSketch",
"fieldName": "delta"
},
{
"name": "HLLSketchBuild",
"type": "HLLSketchBuild",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"filter" : "*.json",
"baseDir": "/resources/data/batch_index/json"
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"maxNumConcurrentSubTasks": 1,
"splitHintSpec": {
"type": "maxSize",
"maxNumFiles": 1
}
}
}
}

View File

@ -22,12 +22,6 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import javax.annotation.Nullable;
import java.util.Map;
public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -47,49 +41,9 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
} }
@Override @Override
protected Query<Map<String, Object>> createCompletedTaskInfoQuery( protected String decorateSqlWithLimit(String sql)
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
)
{ {
String sql = StringUtils.format( return sql + " FETCH FIRST :n ROWS ONLY";
"SELECT "
+ " id, "
+ " status_payload, "
+ " created_date, "
+ " datasource, "
+ " payload "
+ "FROM "
+ " %s "
+ "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC",
getEntryTable()
);
if (maxNumStatuses != null) {
sql += " FETCH FIRST :n ROWS ONLY";
}
Query<Map<String, Object>> query = handle.createQuery(sql).bind("start", timestamp.toString());
if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses);
}
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
}
private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource)
{
String sql = StringUtils.format("active = FALSE AND created_date >= :start ");
if (datasource != null) {
sql += " AND datasource = :ds ";
}
return sql;
} }
@Deprecated @Deprecated

View File

@ -20,13 +20,6 @@
package org.apache.druid.metadata; package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import javax.annotation.Nullable;
import java.util.Map;
public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -45,47 +38,8 @@ public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, L
} }
@Override @Override
protected Query<Map<String, Object>> createCompletedTaskInfoQuery( protected String decorateSqlWithLimit(String sql)
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
)
{ {
String sql = StringUtils.format( return sql + " LIMIT :n";
"SELECT "
+ " id, "
+ " status_payload, "
+ " created_date, "
+ " datasource, "
+ " payload "
+ "FROM "
+ " %s "
+ "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC",
getEntryTable()
);
if (maxNumStatuses != null) {
sql += " LIMIT :n";
}
Query<Map<String, Object>> query = handle.createQuery(sql).bind("start", timestamp.toString());
if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses);
}
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
}
private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource)
{
String sql = StringUtils.format("active = FALSE AND created_date >= :start ");
if (datasource != null) {
sql += " AND datasource = :ds ";
}
return sql;
} }
} }

View File

@ -21,12 +21,6 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import javax.annotation.Nullable;
import java.util.Map;
public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -45,49 +39,9 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
} }
@Override @Override
protected Query<Map<String, Object>> createCompletedTaskInfoQuery( protected String decorateSqlWithLimit(String sql)
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
)
{ {
String sql = StringUtils.format( return sql + " LIMIT :n";
"SELECT "
+ " id, "
+ " status_payload, "
+ " created_date, "
+ " datasource, "
+ " payload "
+ "FROM "
+ " %s "
+ "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC",
getEntryTable()
);
if (maxNumStatuses != null) {
sql += " LIMIT :n";
}
Query<Map<String, Object>> query = handle.createQuery(sql).bind("start", timestamp.toString());
if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses);
}
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
}
private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource)
{
String sql = StringUtils.format("active = FALSE AND created_date >= :start ");
if (datasource != null) {
sql += " AND datasource = :ds ";
}
return sql;
} }
@Deprecated @Deprecated

View File

@ -27,9 +27,12 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Folder3;
@ -269,37 +272,90 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
} }
@Override @Override
public List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo( public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String dataSource
)
{
return getConnector().retryTransaction(
(handle, status) -> {
final List<TaskInfo<EntryType, StatusType>> tasks = new ArrayList<>();
for (Entry<TaskLookupType, TaskLookup> entry : taskLookups.entrySet()) {
final Query<Map<String, Object>> query;
switch (entry.getKey()) {
case ACTIVE:
query = createActiveTaskInfoQuery(
handle,
dataSource
);
tasks.addAll(query.map(taskInfoMapper).list());
break;
case COMPLETE:
CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue();
query = createCompletedTaskInfoQuery(
handle,
completeTaskLookup.getTasksCreatedPriorTo(),
completeTaskLookup.getMaxTaskStatuses(),
dataSource
);
tasks.addAll(query.map(taskInfoMapper).list());
break;
default:
throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey());
}
}
return tasks;
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}
protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
Handle handle,
DateTime timestamp, DateTime timestamp,
@Nullable Integer maxNumStatuses, @Nullable Integer maxNumStatuses,
@Nullable String dataSource @Nullable String dataSource
) )
{ {
return getConnector().retryWithHandle( String sql = StringUtils.format(
handle -> { "SELECT "
final Query<Map<String, Object>> query = createCompletedTaskInfoQuery( + " id, "
handle, + " status_payload, "
timestamp, + " created_date, "
maxNumStatuses, + " datasource, "
dataSource + " payload "
); + "FROM "
return query.map(taskInfoMapper).list(); + " %s "
} + "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC",
getEntryTable()
); );
if (maxNumStatuses != null) {
sql = decorateSqlWithLimit(sql);
}
Query<Map<String, Object>> query = handle.createQuery(sql).bind("start", timestamp.toString());
if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses);
}
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
} }
@Override protected abstract String decorateSqlWithLimit(String sql);
public List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource)
private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource)
{ {
return getConnector().retryWithHandle( String sql = StringUtils.format("active = FALSE AND created_date >= :start ");
handle -> { if (datasource != null) {
final Query<Map<String, Object>> query = createActiveTaskInfoQuery( sql += " AND datasource = :ds ";
handle, }
dataSource return sql;
);
return query.map(taskInfoMapper).list();
}
);
} }
private Query<Map<String, Object>> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource) private Query<Map<String, Object>> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource)
@ -380,13 +436,6 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
} }
} }
protected abstract Query<Map<String, Object>> createCompletedTaskInfoQuery(
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
);
@Override @Override
public boolean addLock(final String entryId, final LockType lock) public boolean addLock(final String entryId, final LockType lock)
{ {

View File

@ -20,13 +20,6 @@
package org.apache.druid.metadata; package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import javax.annotation.Nullable;
import java.util.Map;
public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -45,45 +38,8 @@ public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogTyp
} }
@Override @Override
protected Query<Map<String, Object>> createCompletedTaskInfoQuery( protected String decorateSqlWithLimit(String sql)
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
)
{ {
String sql = maxNumStatuses == null ? "SELECT " : "SELECT TOP (:n) "; return "SELECT TOP (:n)" + sql.substring("SELECT".length());
sql += StringUtils.format(
" id, "
+ " status_payload, "
+ " created_date, "
+ " datasource, "
+ " payload "
+ "FROM "
+ " %s "
+ "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC",
getEntryTable()
);
Query<Map<String, Object>> query = handle.createQuery(sql).bind("start", timestamp.toString());
if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses);
}
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
}
private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource)
{
String sql = StringUtils.format("active = FALSE AND created_date >= :start ");
if (datasource != null) {
sql += " AND datasource = :ds ";
}
return sql;
} }
} }

View File

@ -31,6 +31,8 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -135,7 +137,7 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(Pair.of(entry, status1)), ImmutableList.of(Pair.of(entry, status1)),
handler.getActiveTaskInfo(null).stream() handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream()
.map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus())) .map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus()))
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
@ -144,14 +146,14 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(Pair.of(entry, status2)), ImmutableList.of(Pair.of(entry, status2)),
handler.getActiveTaskInfo(null).stream() handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream()
.map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus())) .map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus()))
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(), ImmutableList.of(),
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null) handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null)
); );
Assert.assertTrue(handler.setStatus(entryId, false, status1)); Assert.assertTrue(handler.setStatus(entryId, false, status1));
@ -176,12 +178,12 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(), ImmutableList.of(),
handler.getCompletedTaskInfo(DateTimes.of("2014-01-03"), null, null) handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-03")), null)
); );
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(status1), ImmutableList.of(status1),
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null) handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null)
.stream() .stream()
.map(TaskInfo::getStatus) .map(TaskInfo::getStatus)
.collect(Collectors.toList()) .collect(Collectors.toList())
@ -199,9 +201,11 @@ public class SQLMetadataStorageActionHandlerTest
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
} }
final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getCompletedTaskInfo( final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getTaskInfos(
DateTimes.of("2014-01-01"), CompleteTaskLookup.withTasksCreatedPriorTo(
7, 7,
DateTimes.of("2014-01-01")
),
null null
); );
Assert.assertEquals(7, statuses.size()); Assert.assertEquals(7, statuses.size());
@ -222,9 +226,11 @@ public class SQLMetadataStorageActionHandlerTest
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
} }
final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getCompletedTaskInfo( final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getTaskInfos(
DateTimes.of("2014-01-01"), CompleteTaskLookup.withTasksCreatedPriorTo(
10, 10,
DateTimes.of("2014-01-01")
),
null null
); );
Assert.assertEquals(5, statuses.size()); Assert.assertEquals(5, statuses.size());
@ -409,14 +415,15 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(entryId2), ImmutableList.of(entryId2),
handler.getActiveTaskInfo(null).stream() handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream()
.map(taskInfo -> taskInfo.getId()) .map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(entryId3, entryId1), ImmutableList.of(entryId3, entryId1),
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null)
.stream()
.map(taskInfo -> taskInfo.getId()) .map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList()) .collect(Collectors.toList())
@ -426,13 +433,14 @@ public class SQLMetadataStorageActionHandlerTest
// active task not removed. // active task not removed.
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(entryId2), ImmutableList.of(entryId2),
handler.getActiveTaskInfo(null).stream() handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream()
.map(taskInfo -> taskInfo.getId()) .map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
Assert.assertEquals( Assert.assertEquals(
ImmutableList.of(entryId3), ImmutableList.of(entryId3),
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null)
.stream()
.map(taskInfo -> taskInfo.getId()) .map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList()) .collect(Collectors.toList())