Index service support for early returns and choice of commit semantics.

Task:
- Add TaskCallback to run method (for early returns)

TaskStatus:
- Remove CONTINUED status
- Add segmentsNuked (placeholder for future deletion support)
- Add more builder methods
- Add validations to constructor

TaskStorage:
- Add TaskStorageQueryAdapter, a concrete class that wraps TaskStorages and
  provides various read-only convenience methods
- Add getTask method for benefit of TaskStorageQueryAdapter

TaskQueue:
- Rename "done" to "notify"
- notify is responsible for deciding if we should commit
- Add optional commitRunnable to "notify", which gets called when it's time to commit
- Allow nextTasks and commits to run early (statusCode RUNNING)
- Move getStatus, collapseStatus functionality to TaskStorageQueryAdapter
This commit is contained in:
Gian Merlino 2013-01-25 10:40:15 -08:00
parent 9b6244ec15
commit a14200d779
29 changed files with 933 additions and 391 deletions

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
package com.metamx.druid.merger.common;
import com.metamx.druid.merger.common.TaskStatus;

View File

@ -22,22 +22,31 @@ package com.metamx.druid.merger.common;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.task.Task;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be
* complete ({@link #isComplete()} true). Ongoing tasks may request segments to be created, but only
* complete tasks may request segments to be nuked or spawn other tasks. Failed tasks may not request
* anything.
*
* TaskStatus objects are immutable.
*/
public class TaskStatus
{
public static enum Status
{
RUNNING,
SUCCESS,
FAILED,
CONTINUED
FAILED
}
public static TaskStatus running(String taskId)
@ -45,37 +54,46 @@ public class TaskStatus
return new TaskStatus(
taskId,
Status.RUNNING,
Collections.<DataSegment>emptyList(),
Collections.<Task>emptyList(),
ImmutableSet.<DataSegment>of(),
ImmutableSet.<DataSegment>of(),
ImmutableList.<Task>of(),
-1
);
}
public static TaskStatus success(String taskId)
{
return success(taskId, ImmutableSet.<DataSegment>of());
}
public static TaskStatus success(String taskId, Set<DataSegment> segments)
{
return new TaskStatus(
taskId,
Status.SUCCESS,
segments,
ImmutableSet.<DataSegment>of(),
ImmutableList.<Task>of(),
-1
);
}
public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, Status.FAILED, Collections.<DataSegment>emptyList(), Collections.<Task>emptyList(), -1);
}
public static TaskStatus success(String taskId, List<DataSegment> segments)
{
return new TaskStatus(taskId, Status.SUCCESS, ImmutableList.copyOf(segments), Collections.<Task>emptyList(), -1);
}
public static TaskStatus continued(String taskId, List<Task> nextTasks)
{
Preconditions.checkArgument(nextTasks.size() > 0, "nextTasks.size() > 0");
return new TaskStatus(
taskId,
Status.CONTINUED,
Collections.<DataSegment>emptyList(),
ImmutableList.copyOf(nextTasks),
Status.FAILED,
ImmutableSet.<DataSegment>of(),
ImmutableSet.<DataSegment>of(),
ImmutableList.<Task>of(),
-1
);
}
private final String id;
private final List<DataSegment> segments;
private final List<Task> nextTasks;
private final ImmutableSet<DataSegment> segments;
private final ImmutableSet<DataSegment> segmentsNuked;
private final ImmutableList<Task> nextTasks;
private final Status status;
private final long duration;
@ -83,16 +101,42 @@ public class TaskStatus
private TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") Status status,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("segmentsNuked") Set<DataSegment> segmentsNuked,
@JsonProperty("nextTasks") List<Task> nextTasks,
@JsonProperty("duration") long duration
)
{
this.id = id;
this.segments = segments;
this.nextTasks = nextTasks;
this.segments = ImmutableSet.copyOf(segments);
this.segmentsNuked = ImmutableSet.copyOf(segmentsNuked);
this.nextTasks = ImmutableList.copyOf(nextTasks);
this.status = status;
this.duration = duration;
// Check class invariants.
Preconditions.checkNotNull(id, "id");
Preconditions.checkNotNull(status, "status");
if (this.segments.size() > 0) {
Preconditions.checkArgument(
status == Status.RUNNING || status == Status.SUCCESS,
"segments not allowed for %s tasks",
status
);
}
if (this.segmentsNuked.size() > 0) {
Preconditions.checkArgument(status == Status.SUCCESS, "segmentsNuked not allowed for %s tasks", status);
}
if (this.nextTasks.size() > 0) {
Preconditions.checkArgument(
status == Status.SUCCESS || status == Status.RUNNING,
"nextTasks not allowed for %s tasks",
status
);
}
}
@JsonProperty("id")
@ -108,11 +152,17 @@ public class TaskStatus
}
@JsonProperty("segments")
public List<DataSegment> getSegments()
public Set<DataSegment> getSegments()
{
return segments;
}
@JsonProperty("segmentsNuked")
public Set<DataSegment> getSegmentsNuked()
{
return segmentsNuked;
}
@JsonProperty("nextTasks")
public List<Task> getNextTasks()
{
@ -127,51 +177,62 @@ public class TaskStatus
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isContinued, isSuccess, or isFailure will be true at any one time.
* isSuccess, or isFailure will be true at any one time.
*/
@JsonIgnore
public boolean isRunnable()
{
return status == Status.RUNNING;
}
/**
* Returned by tasks when they complete successfully without spawning subtasks. Exactly one of isRunnable,
* isContinued, isSuccess, or isFailure will be true at any one time.
*/
public boolean isContinued()
{
return status == Status.CONTINUED;
}
/**
* Inverse of {@link #isRunnable}.
*/
@JsonIgnore
public boolean isComplete()
{
return !isRunnable();
}
/**
* Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isContinued, isSuccess, or isFailure will
* Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isSuccess, or isFailure will
* be true at any one time.
*/
@JsonIgnore
public boolean isSuccess()
{
return status == Status.SUCCESS;
}
/**
* Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isContinued, isSuccess, or
* Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isSuccess, or
* isFailure will be true at any one time.
*/
@JsonIgnore
public boolean isFailure()
{
return status == Status.FAILED;
}
public TaskStatus withSegments(Set<DataSegment> _segments)
{
return new TaskStatus(id, status, _segments, segmentsNuked, nextTasks, duration);
}
public TaskStatus withSegmentsNuked(Set<DataSegment> _segmentsNuked)
{
return new TaskStatus(id, status, segments, _segmentsNuked, nextTasks, duration);
}
public TaskStatus withNextTasks(List<Task> _nextTasks)
{
return new TaskStatus(id, status, segments, segmentsNuked, _nextTasks, duration);
}
public TaskStatus withDuration(long _duration)
{
return new TaskStatus(id, status, segments, nextTasks, _duration);
return new TaskStatus(id, status, segments, segmentsNuked, nextTasks, _duration);
}
@Override

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.common.task;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryGranularity;
@ -29,6 +30,7 @@ import com.metamx.druid.index.v1.IncrementalIndexAdapter;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.IndexableAdapter;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.coordinator.TaskContext;
@ -72,7 +74,7 @@ public class DeleteTask extends AbstractTask
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
@ -100,6 +102,6 @@ public class DeleteTask extends AbstractTask
segment.getVersion()
);
return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment));
return TaskStatus.success(getId(), ImmutableSet.of(uploadedSegment));
}
}

View File

@ -29,6 +29,7 @@ import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.coordinator.TaskContext;
@ -88,15 +89,15 @@ public class IndexDeterminePartitionsTask extends AbstractTask
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
log.info("Running with targetPartitionSize[%d]", targetPartitionSize);
// This is similar to what DeterminePartitionsJob does in the hadoop indexer, but we don't require
// a preconfigured partition dimension (we'll just pick the one with highest cardinality).
// XXX - Space-efficiency (stores all unique dimension values, although at least not all combinations)
// XXX - Time-efficiency (runs all this on one single node instead of through map/reduce)
// NOTE: Space-efficiency (stores all unique dimension values, although at least not all combinations)
// NOTE: Time-efficiency (runs all this on one single node instead of through map/reduce)
// Blacklist dimensions that have multiple values per row
final Set<String> unusableDimensions = Sets.newHashSet();
@ -226,8 +227,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
}
}
return TaskStatus.continued(
getId(),
return TaskStatus.success(getId()).withNextTasks(
Lists.transform(
shardSpecs, new Function<ShardSpec, Task>()
{

View File

@ -20,11 +20,13 @@
package com.metamx.druid.merger.common.task;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.index.YeOldePlumberSchool;
@ -89,7 +91,7 @@ public class IndexGeneratorTask extends AbstractTask
}
@Override
public TaskStatus run(final TaskContext context, final TaskToolbox toolbox) throws Exception
public TaskStatus run(final TaskContext context, final TaskToolbox toolbox, TaskCallback callback) throws Exception
{
// Set up temporary directory for indexing
final File tmpDir = new File(
@ -174,7 +176,7 @@ public class IndexGeneratorTask extends AbstractTask
);
// Done
return TaskStatus.success(getId(), ImmutableList.copyOf(pushedSegments));
return TaskStatus.success(getId(), ImmutableSet.copyOf(pushedSegments));
}
/**

View File

@ -25,6 +25,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.indexer.granularity.GranularitySpec;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.coordinator.TaskContext;
@ -126,11 +127,11 @@ public class IndexTask extends AbstractTask
@Override
public TaskStatus preflight(TaskContext context) throws Exception
{
return TaskStatus.continued(getId(), toSubtasks());
return TaskStatus.success(getId()).withNextTasks(toSubtasks());
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
throw new IllegalStateException("IndexTasks should not be run!");
}

View File

@ -35,6 +35,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.coordinator.TaskContext;
@ -115,7 +116,7 @@ public abstract class MergeTask extends AbstractTask
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
@ -171,7 +172,7 @@ public abstract class MergeTask extends AbstractTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment));
return TaskStatus.success(getId(), ImmutableSet.of(uploadedSegment));
}
catch (Exception e) {
log.error(

View File

@ -21,13 +21,10 @@ package com.metamx.druid.merger.common.task;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.coordinator.TaskContext;
import com.metamx.druid.merger.common.task.IndexDeterminePartitionsTask;
import com.metamx.druid.merger.common.task.IndexGeneratorTask;
import com.metamx.druid.merger.common.task.IndexTask;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
@ -81,8 +78,10 @@ public interface Task
*
* @param context Context for this task, gathered under indexer lock
* @param toolbox Toolbox for this task
* @param callback Callback for "early returns". Statuses returned to this callback must not be
* complete (isRunnable must be true).
* @return Some kind of finished status (isRunnable must be false).
* @throws Exception
*/
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception;
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception;
}

View File

@ -161,6 +161,36 @@ public class DbTaskStorage implements TaskStorage
);
}
@Override
public Optional<Task> getTask(final String taskid)
{
return dbi.withHandle(
new HandleCallback<Optional<Task>>()
{
@Override
public Optional<Task> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE id = :id",
dbConnectorConfig.getTaskTable()
)
)
.bind("id", taskid)
.list();
if(dbTasks.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue(dbStatus.get("payload").toString(), Task.class));
}
}
}
);
}
@Override
public Optional<TaskStatus> getStatus(final String taskid)
{

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator;
import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.task.Task;
@ -70,7 +71,7 @@ public class LocalTaskRunner implements TaskRunner
TaskStatus status;
try {
status = task.run(context, toolbox);
status = task.run(context, toolbox, callback);
}
catch (InterruptedException e) {
log.error(e, "Interrupted while running task[%s]", task);
@ -97,9 +98,9 @@ public class LocalTaskRunner implements TaskRunner
try {
callback.notify(status.withDuration(System.currentTimeMillis() - startTime));
} catch(Throwable t) {
log.error(t, "Uncaught Throwable during callback for task[%s]", task);
throw Throwables.propagate(t);
} catch(Exception e) {
log.error(e, "Uncaught Exception during callback for task[%s]", task);
throw Throwables.propagate(e);
}
}
}

View File

@ -59,6 +59,17 @@ public class LocalTaskStorage implements TaskStorage
tasks.put(task.getId(), new TaskStuff(task, status));
}
@Override
public Optional<Task> getTask(String taskid)
{
Preconditions.checkNotNull(taskid, "taskid");
if(tasks.containsKey(taskid)) {
return Optional.of(tasks.get(taskid).task);
} else {
return Optional.absent();
}
}
@Override
public void setStatus(String taskid, TaskStatus status)
{

View File

@ -33,6 +33,7 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;

View File

@ -19,16 +19,15 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.commit.CommitStyle;
import com.metamx.druid.merger.coordinator.commit.ImmediateCommitStyle;
import org.joda.time.Interval;
import java.util.Set;
import java.util.TreeSet;
import java.util.Map;
/**
* Represents a transaction as well as the lock it holds. Not immutable: the task set can change.
@ -39,16 +38,7 @@ public class TaskGroup
private final String dataSource;
private final Interval interval;
private final String version;
private final Set<Task> taskSet = new TreeSet<Task>(
new Ordering<Task>()
{
@Override
public int compare(Task task, Task task1)
{
return task.getId().compareTo(task1.getId());
}
}.nullsFirst()
);
private final Map<String, Task> taskMap = Maps.newHashMap();
public TaskGroup(String groupId, String dataSource, Interval interval, String version)
{
@ -78,9 +68,40 @@ public class TaskGroup
return version;
}
public Set<Task> getTaskSet()
public CommitStyle getCommitStyle()
{
return taskSet;
// TODO -- should be configurable
return new ImmediateCommitStyle();
}
public int size() {
return taskMap.size();
}
public boolean add(final Task task) {
Preconditions.checkArgument(
task.getGroupId().equals(groupId),
"Task group id[%s] != TaskGroup group id[%s]",
task.getGroupId(),
groupId
);
// Act sort of like a Set
if(taskMap.containsKey(task.getId())) {
return false;
} else {
taskMap.put(task.getId(), task);
return true;
}
}
public boolean contains(final String taskId) {
return taskMap.containsKey(taskId);
}
public Task remove(final String taskId)
{
return taskMap.remove(taskId);
}
@Override
@ -91,21 +112,7 @@ public class TaskGroup
.add("dataSource", dataSource)
.add("interval", interval)
.add("version", version)
.add(
"taskSet",
Lists.newArrayList(
Iterables.transform(
taskSet, new Function<Task, Object>()
{
@Override
public Object apply(Task task)
{
return task.getId();
}
}
)
)
)
.add("tasks", taskMap.keySet())
.toString();
}
}

View File

@ -23,20 +23,19 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Booleans;
import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -61,7 +60,7 @@ import java.util.concurrent.locks.ReentrantLock;
* only one TaskGroup can be running on a particular dataSource + interval, and that TaskGroup has a single version
* string that all tasks in the group must use to publish segments. Tasks in the same TaskGroup may run concurrently.
*
* For persistence, the queue saves new tasks from {@link #add} and task status updates from {@link #done} using a
* For persistence, the queue saves new tasks from {@link #add} and task status updates from {@link #notify} using a
* {@link TaskStorage} object.
*
* To support leader election of our containing system, the queue can be stopped (in which case it will not accept
@ -71,6 +70,8 @@ public class TaskQueue
{
private final List<Task> queue = Lists.newLinkedList();
private final Map<String, NavigableMap<Interval, TaskGroup>> running = Maps.newHashMap();
private final Multimap<String, String> seenNextTasks = HashMultimap.create();
private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock();
@ -78,7 +79,7 @@ public class TaskQueue
private volatile boolean active = false;
private static final Logger log = new Logger(TaskQueue.class);
private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
public TaskQueue(TaskStorage taskStorage)
{
@ -100,36 +101,24 @@ public class TaskQueue
Preconditions.checkState(queue.isEmpty(), "queue must be empty!");
Preconditions.checkState(running.isEmpty(), "running list must be empty!");
// XXX - We might want a TaskStorage API that does this, but including the Pair type in the interface seems clumsy.
final List<Pair<Task, String>> runningTasks = Lists.transform(
taskStorage.getRunningTasks(),
new Function<Task, Pair<Task, String>>()
{
@Override
public Pair<Task, String> apply(Task task)
{
return Pair.of(task, taskStorage.getVersion(task.getId()).orNull());
}
}
);
final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage);
final List<VersionedTaskWrapper> runningTasks = taskStorageQueryAdapter.getRunningTaskVersions();
// Sort by version, with nulls last
final Ordering<Pair<Task, String>> byVersionOrdering = new Ordering<Pair<Task, String>>()
final Ordering<VersionedTaskWrapper> byVersionOrdering = new Ordering<VersionedTaskWrapper>()
{
final private Ordering<String> baseOrdering = Ordering.natural().nullsLast();
@Override
public int compare(
Pair<Task, String> left, Pair<Task, String> right
)
public int compare(VersionedTaskWrapper left, VersionedTaskWrapper right)
{
return baseOrdering.compare(left.rhs, right.rhs);
return baseOrdering.compare(left.getVersion(), right.getVersion());
}
};
for(final Pair<Task, String> taskAndVersion : byVersionOrdering.sortedCopy(runningTasks)) {
final Task task = taskAndVersion.lhs;
final String preferredVersion = taskAndVersion.rhs;
for(final VersionedTaskWrapper taskAndVersion : byVersionOrdering.sortedCopy(runningTasks)) {
final Task task = taskAndVersion.getTask();
final String preferredVersion = taskAndVersion.getVersion();
queue.add(task);
@ -169,6 +158,8 @@ public class TaskQueue
try {
log.info("Naptime!");
queue.clear();
running.clear();
active = false;
@ -184,21 +175,22 @@ public class TaskQueue
* @param task task to add
* @return true
*/
public boolean add(Task task)
public boolean add(final Task task)
{
giant.lock();
try {
Preconditions.checkState(active, "Queue is not active!");
// If this throws, we don't want to insert the task into our queue.
// (This is how we detect duplicates)
taskStorage.insert(task, TaskStatus.running(task.getId()));
queue.add(task);
workMayBeAvailable.signalAll();
return true;
}
finally {
} finally {
giant.unlock();
}
}
@ -254,6 +246,107 @@ public class TaskQueue
}
}
public void notify(final Task task, final TaskStatus status)
{
notify(task, status, null);
}
/**
* Notify this queue that some task has an updated status. If this update is valid, the status will be persisted in
* the task storage facility, and any nextTasks present in the status will be created. If the status is a completed
* status, the task will be unlocked and no further updates will be accepted. If this task has failed, the task group
* it is part of will be terminated.
*
* Finally, if this task is not supposed to be running, this method will simply do nothing.
*
* @param task task to update
* @param status new task status
* @param commitRunnable operation to perform if this task is ready to commit
* @throws NullPointerException if task or status is null
* @throws IllegalArgumentException if the task ID does not match the status ID
* @throws IllegalStateException if this queue is currently shut down
*/
public void notify(final Task task, final TaskStatus status, final Runnable commitRunnable)
{
giant.lock();
try {
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkArgument(
task.getId().equals(status.getId()),
"Mismatching task ids[%s/%s]",
task.getId(),
status.getId()
);
final TaskGroup taskGroup;
final Optional<TaskGroup> maybeTaskGroup = findTaskGroupForTask(task);
if(!maybeTaskGroup.isPresent()) {
log.info("Ignoring notification for dead task: %s", task.getId());
return;
} else {
taskGroup = maybeTaskGroup.get();
}
// Update status in DB
// TODO: We can either do this first, in which case we run the risk of having a task marked done in the DB but
// TODO: not committed here; or we can do it last, in which case we run the risk of having a task marked running
// TODO: in the DB but committed here. Currently, we err on the former side because we don't want a ticking time
// TODO: bomb in the DB (a task marked running that we have forgotten about, which will potentially be re-
// TODO: started much later when a coordinator bootstraps).
// TODO:
// TODO: Eventually we should have this status update enter a retry queue instead of throwing an exception
// TODO: if it fails.
taskStorage.setStatus(task.getId(), status);
// Should we commit?
if(taskGroup.getCommitStyle().shouldCommit(task, status)) {
log.info("Committing %s status for task: %s", status.getStatusCode(), task.getId());
// Add next tasks
try {
if(commitRunnable != null) {
log.info("Running commitRunnable for task: %s", task.getId());
commitRunnable.run();
}
// We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over.
// So, we need to remember which ones we've already spawned and not do them again.
for(final Task nextTask : status.getNextTasks()) {
if(!seenNextTasks.containsEntry(task.getId(), nextTask.getId())) {
add(nextTask);
tryLock(nextTask);
seenNextTasks.put(task.getId(), nextTask.getId());
} else {
log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId());
}
}
} catch(Exception e) {
log.makeAlert(e, "Failed to commit task")
.addData("task", task.getId())
.addData("statusCode", status.getStatusCode())
.emit();
// TODO -- If this fails, it should enter a retry queue instead of throwing an exception
taskStorage.setStatus(task.getId(), TaskStatus.failure(task.getId()).withDuration(status.getDuration()));
}
} else {
log.info("Not committing %s status for task: %s", status.getStatusCode(), task);
}
if(status.isComplete()) {
unlock(task);
seenNextTasks.removeAll(task.getId());
log.info("Task done: %s", task);
}
} finally {
giant.unlock();
}
}
/**
* Unlock some work. Does not update the task storage facility. Throws an exception if this work is not currently
* running.
@ -267,34 +360,21 @@ public class TaskQueue
try {
final String dataSource = task.getDataSource();
final Interval interval = task.getInterval();
final List<TaskGroup> maybeTaskGroup = Lists.newArrayList(
FunctionalIterable.create(findLocks(dataSource, interval))
.filter(
new Predicate<TaskGroup>()
{
@Override
public boolean apply(TaskGroup taskGroup)
{
return taskGroup.getTaskSet().contains(task);
}
}
)
);
final TaskGroup taskGroup;
if(maybeTaskGroup.size() == 1) {
taskGroup = maybeTaskGroup.get(0);
final Optional<TaskGroup> maybeTaskGroup = findTaskGroupForTask(task);
if(maybeTaskGroup.isPresent()) {
taskGroup = maybeTaskGroup.get();
} else {
throw new IllegalStateException(String.format("Task must be running: %s", task.getId()));
}
// Remove task from live list
log.info("Removing task[%s] from TaskGroup[%s]", task.getId(), taskGroup.getGroupId());
taskGroup.getTaskSet().remove(task);
taskGroup.remove(task.getId());
if(taskGroup.getTaskSet().size() == 0) {
if(taskGroup.size() == 0) {
log.info("TaskGroup complete: %s", taskGroup);
running.get(dataSource).remove(taskGroup.getInterval());
}
@ -310,111 +390,6 @@ public class TaskQueue
}
}
/**
* Unlock some task and update its status in the task storage facility. If "status" is a continuation status (i.e.
* it has nextTasks) this will add the next tasks to the queue with a generic running status.
*
* @param task task to unlock
* @param status task completion status; must not be runnable
* @throws IllegalStateException if task is not currently running, or if status is runnable
*/
public void done(final Task task, final TaskStatus status)
{
giant.lock();
try {
Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkState(!status.isRunnable(), "status must no longer be runnable");
Preconditions.checkState(
task.getId().equals(status.getId()),
"Mismatching task ids[%s/%s]",
task.getId(),
status.getId()
);
// Might change on continuation failure
TaskStatus actualStatus = status;
// Add next tasks, if any
try {
for(final Task nextTask : status.getNextTasks()) {
add(nextTask);
tryLock(nextTask);
}
} catch(Exception e) {
log.error(e, "Failed to continue task: %s", task.getId());
actualStatus = TaskStatus.failure(task.getId());
}
unlock(task);
// Update status in DB
taskStorage.setStatus(task.getId(), actualStatus);
log.info("Task done: %s", task);
}
finally {
giant.unlock();
}
}
/**
* Returns task status for a particular task ID. May collapse "continued" statuses down to "success" or "failure"
* if appropriate.
*/
public Optional<TaskStatus> getStatus(final String taskid)
{
giant.lock();
try {
final Optional<TaskStatus> statusOptional = taskStorage.getStatus(taskid);
if(statusOptional.isPresent()) {
// See if we can collapse this down
return Optional.of(collapseStatus(statusOptional.get()));
} else {
return statusOptional;
}
}
finally {
giant.unlock();
}
}
private TaskStatus collapseStatus(TaskStatus status)
{
if (status.isContinued()) {
int nSubtasks = 0;
int nSuccesses = 0;
List<DataSegment> segments = Lists.newArrayList();
for(final Task subtask : status.getNextTasks()) {
final TaskStatus subtaskStatus = collapseStatus(taskStorage.getStatus(subtask.getId()).get());
nSubtasks ++;
if (subtaskStatus.isFailure()) {
return TaskStatus.failure(status.getId());
} else if (subtaskStatus.isSuccess()) {
nSuccesses++;
segments.addAll(subtaskStatus.getSegments());
}
}
if (nSubtasks == nSuccesses) {
return TaskStatus.success(status.getId(), segments);
}
}
// unable to collapse it down
return status;
}
/**
* Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task.
*
@ -442,7 +417,7 @@ public class TaskQueue
final String dataSource = task.getDataSource();
final Interval interval = task.getInterval();
final List<TaskGroup> foundLocks = findLocks(dataSource, interval);
final List<TaskGroup> foundLocks = findTaskGroupsForInterval(dataSource, interval);
final TaskGroup taskGroupToUse;
if (foundLocks.size() > 1) {
@ -495,7 +470,7 @@ public class TaskQueue
}
// Add to existing TaskGroup, if necessary
if (taskGroupToUse.getTaskSet().add(task)) {
if (taskGroupToUse.add(task)) {
log.info("Added task[%s] to TaskGroup[%s]", task.getId(), taskGroupToUse.getGroupId());
} else {
log.info("Task[%s] already present in TaskGroup[%s]", task.getId(), taskGroupToUse.getGroupId());
@ -509,10 +484,47 @@ public class TaskQueue
}
/**
* Return the currently-running task group for some task. If the task has no currently-running task group, this will
* return an absentee Optional.
*
* @param task task for which to locate group
*/
private Optional<TaskGroup> findTaskGroupForTask(final Task task)
{
giant.lock();
try {
final List<TaskGroup> maybeTaskGroup = Lists.newArrayList(
FunctionalIterable.create(findTaskGroupsForInterval(task.getDataSource(), task.getInterval()))
.filter(
new Predicate<TaskGroup>()
{
@Override
public boolean apply(TaskGroup taskGroup)
{
return taskGroup.contains(task.getId());
}
}
)
);
if(maybeTaskGroup.size() == 1) {
return Optional.of(maybeTaskGroup.get(0));
} else if(maybeTaskGroup.size() == 0) {
return Optional.absent();
} else {
throw new IllegalStateException(String.format("WTF?! Task %s is in multiple task groups!", task.getId()));
}
} finally {
giant.unlock();
}
}
/**
* Return all locks that overlap some search interval.
*/
private List<TaskGroup> findLocks(final String dataSource, final Interval interval)
private List<TaskGroup> findTaskGroupsForInterval(final String dataSource, final Interval interval)
{
giant.lock();

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.task.Task;
/**
@ -28,7 +29,8 @@ import com.metamx.druid.merger.common.task.Task;
public interface TaskRunner
{
/**
* Run a task with a particular context and call a callback. The callback should be called exactly once.
* Run a task with a particular context and call a callback. The callback may be called multiple times with RUNNING
* status, but should be called exactly once with a non-RUNNING status (e.g. SUCCESS, FAILED, CONTINUED...).
*
* @param task task to run
* @param context task context to run under

View File

@ -44,6 +44,14 @@ public interface TaskStorage
*/
public void setVersion(String taskid, String version);
/**
* Returns task as stored in the storage facility. If the task ID does not exist, this will return an
* absentee Optional.
*
* TODO -- This method probably wants to be combined with {@link #getStatus}.
*/
public Optional<Task> getTask(String taskid);
/**
* Returns task status as stored in the storage facility. If the task ID does not exist, this will return
* an absentee Optional.

View File

@ -0,0 +1,145 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Wraps a {@link TaskStorage}, providing a useful collection of read-only methods.
*/
public class TaskStorageQueryAdapter
{
private final TaskStorage storage;
public TaskStorageQueryAdapter(TaskStorage storage)
{
this.storage = storage;
}
public Optional<TaskStatus> getStatus(final String taskid)
{
return storage.getStatus(taskid);
}
/**
* Returns all recursive task statuses for a particular task from the same task group. Includes that task,
* plus any tasks it spawned, and so on. Excludes spawned tasks that ended up in a different task group.
*/
public Map<String, Optional<TaskStatus>> getGroupRecursiveStatuses(final String taskid)
{
final Optional<Task> taskOptional = storage.getTask(taskid);
final Optional<TaskStatus> statusOptional = storage.getStatus(taskid);
final ImmutableMap.Builder<String, Optional<TaskStatus>> resultBuilder = ImmutableMap.builder();
resultBuilder.put(taskid, statusOptional);
if(taskOptional.isPresent() && statusOptional.isPresent()) {
for(final Task nextTask : statusOptional.get().getNextTasks()) {
if(nextTask.getGroupId().equals(taskOptional.get().getGroupId())) {
resultBuilder.putAll(getGroupRecursiveStatuses(nextTask.getId()));
}
}
}
return resultBuilder.build();
}
/**
* Like {@link #getGroupRecursiveStatuses}, but flattens all recursive statuses for the same task group into a
* single, merged status.
*/
public Optional<TaskStatus> getGroupMergedStatus(final String taskid)
{
final Map<String, Optional<TaskStatus>> statuses = getGroupRecursiveStatuses(taskid);
int nSuccesses = 0;
int nFailures = 0;
int nTotal = 0;
final Set<DataSegment> segments = Sets.newHashSet();
final Set<DataSegment> segmentsNuked = Sets.newHashSet();
final List<Task> nextTasks = Lists.newArrayList();
for(final Optional<TaskStatus> statusOption : statuses.values()) {
nTotal ++;
if(statusOption.isPresent()) {
final TaskStatus status = statusOption.get();
segments.addAll(status.getSegments());
segmentsNuked.addAll(status.getSegmentsNuked());
nextTasks.addAll(status.getNextTasks());
if(status.isSuccess()) {
nSuccesses ++;
} else if(status.isFailure()) {
nFailures ++;
}
}
}
final Optional<TaskStatus> status;
if(nTotal == 0) {
status = Optional.absent();
} else if(nSuccesses == nTotal) {
status = Optional.of(TaskStatus.success(taskid)
.withSegments(segments)
.withSegmentsNuked(segmentsNuked)
.withNextTasks(nextTasks));
} else if(nFailures > 0) {
status = Optional.of(TaskStatus.failure(taskid));
} else {
status = Optional.of(TaskStatus.running(taskid));
}
return status;
}
/**
* Returns running tasks along with their preferred versions. If no version is present for a task, the
* version field of the returned {@link VersionedTaskWrapper} will be null.
*/
public List<VersionedTaskWrapper> getRunningTaskVersions()
{
return Lists.transform(
storage.getRunningTasks(),
new Function<Task, VersionedTaskWrapper>()
{
@Override
public VersionedTaskWrapper apply(Task task)
{
return new VersionedTaskWrapper(task, storage.getVersion(task.getId()).orNull());
}
}
);
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.task.Task;
/**

View File

@ -0,0 +1,32 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.commit;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskGroup;
/**
* Determines whether or not metadata from a task status update should be committed.
*/
public interface CommitStyle
{
public boolean shouldCommit(Task task, TaskStatus taskStatus);
}

View File

@ -0,0 +1,33 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.commit;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskGroup;
public class ImmediateCommitStyle implements CommitStyle
{
@Override
public boolean shouldCommit(Task task, TaskStatus taskStatus)
{
return true;
}
}

View File

@ -0,0 +1,32 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.commit;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
public class TaskCommitStyle implements CommitStyle
{
@Override
public boolean shouldCommit(Task task, TaskStatus taskStatus)
{
return taskStatus.isSuccess();
}
}

View File

@ -27,7 +27,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.TaskCallback;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.coordinator.TaskContext;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
@ -108,7 +108,7 @@ public class TaskConsumer implements Runnable
// Retry would be nice, but only after we have a way to throttle and limit them. Just fail for now.
if(!shutdown) {
queue.done(task, TaskStatus.failure(task.getId()));
queue.notify(task, TaskStatus.failure(task.getId()));
}
}
}
@ -148,111 +148,116 @@ public class TaskConsumer implements Runnable
if (!preflightStatus.isRunnable()) {
log.info("Task finished during preflight: %s", task.getId());
queue.done(task, preflightStatus);
queue.notify(task, preflightStatus);
return;
}
// Hand off work to TaskRunner
// TODO -- Should something in the TaskCallback enforce that each returned status is logically after the previous?
// TODO -- Probably yes. But make sure it works in the face of RTR retries.
runner.run(
task, context, new TaskCallback()
{
@Override
public void notify(final TaskStatus statusFromRunner)
{
// task is done
log.info("TaskRunner finished task: %s", task);
// we might need to change this due to exceptions
TaskStatus status = statusFromRunner;
// If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after
// we check and before we commit the database transaction, but better than nothing.
if(shutdown) {
log.info("Abandoning task due to shutdown: %s", task.getId());
return;
}
// Publish returned segments
// FIXME: Publish in transaction
try {
for (DataSegment segment : status.getSegments()) {
if (!task.getDataSource().equals(segment.getDataSource())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid dataSource: %s",
task.getId(),
segment.getIdentifier()
)
);
}
log.info("Received %s status for task: %s", statusFromRunner.getStatusCode(), task);
if (!task.getInterval().contains(segment.getInterval())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid interval: %s",
task.getId(),
segment.getIdentifier()
)
);
}
if (!context.getVersion().equals(segment.getVersion())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId());
mergerDBCoordinator.announceHistoricalSegment(segment);
// If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after
// we check and before we commit the database transaction, but better than nothing.
if(shutdown) {
log.info("Abandoning task due to shutdown: %s", task.getId());
return;
}
}
catch (Exception e) {
log.error(e, "Exception while publishing segments for task: %s", task);
status = TaskStatus.failure(task.getId()).withDuration(status.getDuration());
}
try {
queue.done(task, status);
}
catch (Exception e) {
log.error(e, "Exception while marking task done: %s", task);
throw Throwables.propagate(e);
}
queue.notify(task, statusFromRunner, new Runnable()
{
@Override
public void run()
{
try {
// Publish returned segments
// TODO -- Publish in transaction
if(statusFromRunner.getSegments().size() > 0) {
for (DataSegment segment : statusFromRunner.getSegments()) {
if (!task.getDataSource().equals(segment.getDataSource())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid dataSource: %s",
task.getId(),
segment.getIdentifier()
)
);
}
// emit event and log
int bytes = 0;
for (DataSegment segment : status.getSegments()) {
bytes += segment.getSize();
}
if (!task.getInterval().contains(segment.getInterval())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid interval: %s",
task.getId(),
segment.getIdentifier()
)
);
}
builder.setUser3(status.getStatusCode().toString());
if (!context.getVersion().equals(segment.getVersion())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
emitter.emit(builder.build("indexer/time/run/millis", status.getDuration()));
emitter.emit(builder.build("indexer/segment/count", status.getSegments().size()));
emitter.emit(builder.build("indexer/segment/bytes", bytes));
log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId());
mergerDBCoordinator.announceHistoricalSegment(segment);
}
}
} catch(Exception e) {
log.error(e, "Exception while publishing segments for task: %s", task);
throw Throwables.propagate(e);
}
}
});
if (status.isFailure()) {
log.makeAlert("Failed to index")
// Emit event and log, if the task is done
if(statusFromRunner.isComplete()) {
int segmentBytes = 0;
for (DataSegment segment : statusFromRunner.getSegments()) {
segmentBytes += segment.getSize();
}
builder.setUser3(statusFromRunner.getStatusCode().toString());
emitter.emit(builder.build("indexer/time/run/millis", statusFromRunner.getDuration()));
emitter.emit(builder.build("indexer/segment/count", statusFromRunner.getSegments().size()));
emitter.emit(builder.build("indexer/segment/bytes", segmentBytes));
if (statusFromRunner.isFailure()) {
log.makeAlert("Failed to index")
.addData("task", task.getId())
.addData("type", task.getType().toString())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getInterval())
.emit();
}
log.info(
"Task %s: %s (%d segments) (%d run duration)",
statusFromRunner.getStatusCode(),
task,
statusFromRunner.getSegments().size(),
statusFromRunner.getDuration()
);
}
} catch(Exception e) {
log.makeAlert(e, "Failed to handle task callback")
.addData("task", task.getId())
.addData("type", task.getType().toString())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getInterval())
.addData("statusCode", statusFromRunner.getStatusCode())
.emit();
}
log.info(
"Task %s: %s (%d segments) (%d run duration)",
status.getStatusCode(),
task,
status.getSegments().size(),
status.getDuration()
);
}
}
);

View File

@ -64,6 +64,7 @@ import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
import com.metamx.druid.merger.coordinator.TaskStorage;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
@ -235,6 +236,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
config,
emitter,
taskQueue,
new TaskStorageQueryAdapter(taskStorage),
workerSetupManager
)
);

View File

@ -27,6 +27,7 @@ import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.MergeTask;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
@ -50,6 +51,7 @@ public class IndexerCoordinatorResource
private final IndexerCoordinatorConfig config;
private final ServiceEmitter emitter;
private final TaskQueue tasks;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final WorkerSetupManager workerSetupManager;
@Inject
@ -57,6 +59,7 @@ public class IndexerCoordinatorResource
IndexerCoordinatorConfig config,
ServiceEmitter emitter,
TaskQueue tasks,
TaskStorageQueryAdapter taskStorageQueryAdapter,
WorkerSetupManager workerSetupManager
) throws Exception
@ -64,6 +67,7 @@ public class IndexerCoordinatorResource
this.config = config;
this.emitter = emitter;
this.tasks = tasks;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.workerSetupManager = workerSetupManager;
}
@ -108,7 +112,7 @@ public class IndexerCoordinatorResource
@Produces("application/json")
public Response doStatus(@PathParam("taskid") String taskid)
{
final Optional<TaskStatus> status = tasks.getStatus(taskid);
final Optional<TaskStatus> status = taskStorageQueryAdapter.getGroupMergedStatus(taskid);
if (!status.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.google.inject.Provides;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter;
@ -39,6 +40,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
private final IndexerCoordinatorConfig indexerCoordinatorConfig;
private final ServiceEmitter emitter;
private final TaskQueue tasks;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final WorkerSetupManager workerSetupManager;
public IndexerCoordinatorServletModule(
@ -46,6 +48,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
IndexerCoordinatorConfig indexerCoordinatorConfig,
ServiceEmitter emitter,
TaskQueue tasks,
TaskStorageQueryAdapter taskStorageQueryAdapter,
WorkerSetupManager workerSetupManager
)
{
@ -53,6 +56,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
this.indexerCoordinatorConfig = indexerCoordinatorConfig;
this.emitter = emitter;
this.tasks = tasks;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.workerSetupManager = workerSetupManager;
}
@ -64,6 +68,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
bind(ServiceEmitter.class).toInstance(emitter);
bind(TaskQueue.class).toInstance(tasks);
bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter);
bind(WorkerSetupManager.class).toInstance(workerSetupManager);
serve("/*").with(GuiceContainer.class);

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.worker;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
@ -110,7 +111,16 @@ public class TaskMonitor
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
taskStatus = task.run(taskContext, toolbox);
taskStatus = task.run(
taskContext, toolbox, new TaskCallback()
{
@Override
public void notify(TaskStatus status)
{
workerCuratorCoordinator.announceStatus(status);
}
}
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to run task")

View File

@ -96,7 +96,7 @@ public class WorkerNode extends RegisteringNode
private List<Monitor> monitors = null;
private ServiceEmitter emitter = null;
private IndexerCoordinatorConfig coordinatorConfig = null; // FIXME needed for task toolbox, but shouldn't be
private IndexerCoordinatorConfig coordinatorConfig = null; // TODO needed for task toolbox, but shouldn't be
private WorkerConfig workerConfig = null;
private TaskToolbox taskToolbox = null;
private CuratorFramework curatorFramework = null;

View File

@ -1,12 +1,14 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
@ -191,7 +193,7 @@ public class RemoteTaskRunnerTest
jsonMapper.writeValueAsBytes(
TaskStatus.success(
"task1",
Lists.<DataSegment>newArrayList()
ImmutableSet.<DataSegment>of()
)
)
);
@ -500,9 +502,9 @@ public class RemoteTaskRunnerTest
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
return TaskStatus.success("task1", Lists.<DataSegment>newArrayList());
return TaskStatus.success("task1", ImmutableSet.<DataSegment>of());
}
}
}

View File

@ -20,9 +20,11 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.task.AbstractTask;
@ -40,20 +42,16 @@ public class TaskQueueTest
@Test
public void testEmptyQueue() throws Exception
{
final TaskQueue tq = newTaskQueue();
final TaskStorage ts = new LocalTaskStorage();
final TaskQueue tq = newTaskQueueWithStorage(ts);
// get task status for nonexistent task
Assert.assertFalse("getStatus", tq.getStatus("foo").isPresent());
Assert.assertFalse("getStatus", ts.getStatus("foo").isPresent());
// poll on empty queue
Assert.assertNull("poll", tq.poll());
}
private static TaskQueue newTaskQueue()
{
return newTaskQueueWithStorage(new LocalTaskStorage());
}
public static TaskQueue newTaskQueueWithStorage(TaskStorage storage)
{
final TaskQueue tq = new TaskQueue(storage);
@ -64,7 +62,9 @@ public class TaskQueueTest
@Test
public void testAddRemove() throws Exception
{
final TaskQueue tq = newTaskQueue();
final TaskStorage ts = new LocalTaskStorage();
final TaskQueue tq = newTaskQueueWithStorage(ts);
final Task[] tasks = {
newTask("T0", "G0", "bar", new Interval("2011/P1Y")),
newTask("T1", "G1", "bar", new Interval("2011-03-01/P1D")),
@ -81,7 +81,7 @@ public class TaskQueueTest
}
// get task status for in-progress task
Assert.assertEquals("T2 status (before finishing)", TaskStatus.Status.RUNNING, tq.getStatus(tasks[2].getId()).get().getStatusCode());
Assert.assertEquals("T2 status (before finishing)", TaskStatus.Status.RUNNING, ts.getStatus(tasks[2].getId()).get().getStatusCode());
// Can't add tasks with the same id
thrown = null;
@ -114,20 +114,23 @@ public class TaskQueueTest
);
// mark one done
tq.done(tasks[2], tasks[2].run(null, null));
final TestCommitRunnable commit1 = newCommitRunnable();
tq.notify(tasks[2], tasks[2].run(null, null, null), commit1);
// get its status back
Assert.assertEquals("T2 status (after finishing)", TaskStatus.Status.SUCCESS, tq.getStatus(tasks[2].getId()).get().getStatusCode());
Assert.assertEquals(
"T2 status (after finishing)",
TaskStatus.Status.SUCCESS,
ts.getStatus(tasks[2].getId()).get().getStatusCode()
);
Assert.assertEquals("Commit #1 wasRun", commit1.wasRun(), true);
// Can't do a task twice
thrown = null;
try {
tq.done(tasks[2], tasks[2].run(null, null));
} catch(IllegalStateException e) {
thrown = e;
}
final TestCommitRunnable commit2 = newCommitRunnable();
tq.notify(tasks[2], tasks[2].run(null, null, null), commit2);
Assert.assertNotNull("Exception on twice-done task", thrown);
Assert.assertEquals("Commit #2 wasRun", commit2.wasRun(), false);
// we should be able to get one more task now
taken.clear();
@ -156,44 +159,45 @@ public class TaskQueueTest
@Test
public void testContinues() throws Exception
{
final TaskQueue tq = newTaskQueue();
final TaskStorage ts = new LocalTaskStorage();
final TaskQueue tq = newTaskQueueWithStorage(ts);
final Task t0 = newTask("T0", "G0", "bar", new Interval("2011/P1Y"));
final Task t1 = newContinuedTask("T1", "G1", "bar", new Interval("2013/P1Y"), Lists.newArrayList(t0));
tq.add(t1);
Assert.assertTrue("T0 isPresent (#1)", !tq.getStatus("T0").isPresent());
Assert.assertTrue("T1 isPresent (#1)", tq.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#1)", tq.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#1)", !tq.getStatus("T1").get().isComplete());
Assert.assertTrue("T0 isPresent (#1)", !ts.getStatus("T0").isPresent());
Assert.assertTrue("T1 isPresent (#1)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#1)", ts.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#1)", !ts.getStatus("T1").get().isComplete());
// should be able to get t1 out
Assert.assertEquals("poll #1", "T1", tq.poll().getTask().getId());
Assert.assertNull("poll #2", tq.poll());
// report T1 done. Should cause T0 to be created
tq.done(t1, t1.run(null, null));
tq.notify(t1, t1.run(null, null, null));
Assert.assertTrue("T0 isPresent (#2)", tq.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#2)", tq.getStatus("T0").get().isRunnable());
Assert.assertTrue("T0 isComplete (#2)", !tq.getStatus("T0").get().isComplete());
Assert.assertTrue("T1 isPresent (#2)", tq.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#2)", !tq.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#2)", tq.getStatus("T1").get().isComplete());
Assert.assertTrue("T0 isPresent (#2)", ts.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#2)", ts.getStatus("T0").get().isRunnable());
Assert.assertTrue("T0 isComplete (#2)", !ts.getStatus("T0").get().isComplete());
Assert.assertTrue("T1 isPresent (#2)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#2)", !ts.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#2)", ts.getStatus("T1").get().isComplete());
// should be able to get t0 out
Assert.assertEquals("poll #3", "T0", tq.poll().getTask().getId());
Assert.assertNull("poll #4", tq.poll());
// report T0 done. Should cause T0, T1 to be marked complete
tq.done(t0, t0.run(null, null));
tq.notify(t0, t0.run(null, null, null));
Assert.assertTrue("T0 isPresent (#3)", tq.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#3)", !tq.getStatus("T0").get().isRunnable());
Assert.assertTrue("T0 isComplete (#3)", tq.getStatus("T0").get().isComplete());
Assert.assertTrue("T1 isPresent (#3)", tq.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#3)", !tq.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#3)", tq.getStatus("T1").get().isComplete());
Assert.assertTrue("T0 isPresent (#3)", ts.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#3)", !ts.getStatus("T0").get().isRunnable());
Assert.assertTrue("T0 isComplete (#3)", ts.getStatus("T0").get().isComplete());
Assert.assertTrue("T1 isPresent (#3)", ts.getStatus("T1").isPresent());
Assert.assertTrue("T1 isRunnable (#3)", !ts.getStatus("T1").get().isRunnable());
Assert.assertTrue("T1 isComplete (#3)", ts.getStatus("T1").get().isComplete());
// should be no more events available for polling
Assert.assertNull("poll #5", tq.poll());
@ -202,7 +206,8 @@ public class TaskQueueTest
@Test
public void testConcurrency() throws Exception
{
final TaskQueue tq = newTaskQueue();
final TaskStorage ts = new LocalTaskStorage();
final TaskQueue tq = newTaskQueueWithStorage(ts);
// Imagine a larger task that splits itself up into pieces
final Task t1 = newTask("T1", "G0", "bar", new Interval("2011-01-01/P1D"));
@ -227,7 +232,7 @@ public class TaskQueueTest
Thread.sleep(5);
// Finish t0
tq.done(t0, t0.run(null, null));
tq.notify(t0, t0.run(null, null, null));
// take max number of tasks
final Set<String> taken = Sets.newHashSet();
@ -254,11 +259,11 @@ public class TaskQueueTest
Assert.assertEquals("taken", Sets.newHashSet("T1", "T3"), taken);
// Finish t1
tq.done(t1, t1.run(null, null));
tq.notify(t1, t1.run(null, null, null));
Assert.assertNull("null poll #2", tq.poll());
// Finish t3
tq.done(t3, t3.run(null, null));
tq.notify(t3, t3.run(null, null, null));
// We should be able to get t2 now
final VersionedTaskWrapper wt2 = tq.poll();
@ -268,7 +273,7 @@ public class TaskQueueTest
Assert.assertNull("null poll #3", tq.poll());
// Finish t2
tq.done(t2, t2.run(null, null));
tq.notify(t2, t2.run(null, null, null));
// We should be able to get t4
// And it should be in group G0, but that group should have a different version than last time
@ -281,7 +286,7 @@ public class TaskQueueTest
Assert.assertNotSame("wt4 version", wt2.getVersion(), wt4.getVersion());
// Kind of done testing at this point, but let's finish t4 anyway
tq.done(t4, t4.run(null, null));
tq.notify(t4, t4.run(null, null, null));
Assert.assertNull("null poll #4", tq.poll());
}
@ -299,7 +304,7 @@ public class TaskQueueTest
Assert.assertEquals("vt1 id", "T1", vt1.getTask().getId());
Assert.assertEquals("vt1 version", "1234", vt1.getVersion());
tq.done(vt1.getTask(), TaskStatus.success("T1", ImmutableList.<DataSegment>of()));
tq.notify(vt1.getTask(), TaskStatus.success("T1", ImmutableSet.<DataSegment>of()));
// re-bootstrap
tq.stop();
@ -309,16 +314,123 @@ public class TaskQueueTest
Assert.assertNull("null poll", tq.poll());
}
@Test
public void testRealtimeish() throws Exception
{
final TaskStorage ts = new LocalTaskStorage();
final TaskQueue tq = newTaskQueueWithStorage(ts);
class StructThingy
{
boolean pushed = false;
boolean pass1 = false;
boolean pass2 = false;
}
final StructThingy structThingy = new StructThingy();
// Test a task that acts sort of like the realtime task, to make sure this case works.
final Task rtTask = new AbstractTask("id1", "ds", new Interval("2010-01-01T00:00:00Z/PT1H"))
{
@Override
public Type getType()
{
return Type.TEST;
}
@Override
public TaskStatus run(
TaskContext context, TaskToolbox toolbox, TaskCallback callback
) throws Exception
{
final Set<DataSegment> segments = ImmutableSet.of(
DataSegment.builder()
.dataSource("ds")
.interval(new Interval("2010-01-01T00:00:00Z/PT1H"))
.version(context.getVersion())
.build()
);
final List<Task> nextTasks = ImmutableList.of(
newTask(
"id2",
"id2",
"ds",
new Interval(
"2010-01-01T01:00:00Z/PT1H"
)
)
);
final TaskStatus status1 = TaskStatus.running("id1").withNextTasks(nextTasks);
final TaskStatus status2 = TaskStatus.running("id1").withNextTasks(nextTasks).withSegments(segments);
final TaskStatus status3 = TaskStatus.success("id1").withNextTasks(nextTasks).withSegments(segments);
// Create a new realtime task!
callback.notify(status1);
if(ts.getStatus("id2").get().getStatusCode() == TaskStatus.Status.RUNNING) {
// test immediate creation of nextTask
structThingy.pass1 = true;
}
// Hand off a segment!
callback.notify(status2);
if(structThingy.pushed) {
// test immediate handoff of segment
structThingy.pass2 = true;
}
// Return success!
return status3;
}
};
tq.add(rtTask);
final VersionedTaskWrapper vt = tq.poll();
final TaskCallback callback = new TaskCallback()
{
@Override
public void notify(final TaskStatus status)
{
final Runnable commitRunnable = new Runnable()
{
@Override
public void run()
{
if(status.getNextTasks().size() > 0) {
structThingy.pushed = true;
}
}
};
tq.notify(vt.getTask(), status, commitRunnable);
}
};
callback.notify(vt.getTask().run(new TaskContext(vt.getVersion(), null), null, callback));
// OK, finally ready to test stuff.
Assert.assertTrue("pass1", structThingy.pass1);
Assert.assertTrue("pass2", structThingy.pass2);
Assert.assertTrue("id1 isSuccess", ts.getStatus("id1").get().isSuccess());
Assert.assertTrue(
"id1 isSuccess (merged)",
new TaskStorageQueryAdapter(ts).getGroupMergedStatus("id1").get().isSuccess()
);
Assert.assertTrue("id2 isRunnable", ts.getStatus("id2").get().isRunnable());
}
private static Task newTask(final String id, final String groupId, final String dataSource, final Interval interval)
{
return new AbstractTask(id, groupId, dataSource, interval)
{
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
return TaskStatus.success(
id,
Lists.newArrayList(
ImmutableSet.of(
new DataSegment(
dataSource,
interval,
@ -358,10 +470,31 @@ public class TaskQueueTest
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
return TaskStatus.continued(id, nextTasks);
return TaskStatus.success(id).withNextTasks(nextTasks);
}
};
}
private static TestCommitRunnable newCommitRunnable()
{
return new TestCommitRunnable();
}
private static class TestCommitRunnable implements Runnable
{
private boolean _wasRun = false;
@Override
public void run()
{
_wasRun = true;
}
public boolean wasRun()
{
return _wasRun;
}
}
}