Merger: Replace TaskCallback with ListenableFuture<TaskStatus>

This commit is contained in:
Gian Merlino 2013-03-18 17:53:38 -07:00
parent 5c3db75a01
commit 5f513be363
7 changed files with 103 additions and 151 deletions

View File

@ -1,25 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common;
public interface TaskCallback
{
public void notify(TaskStatus status);
}

View File

@ -21,28 +21,24 @@ package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.mortbay.thread.ThreadPool;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@ -53,7 +49,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class LocalTaskRunner implements TaskRunner
{
private final TaskToolboxFactory toolboxFactory;
private final ExecutorService exec;
private final ListeningExecutorService exec;
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>();
@ -65,7 +61,7 @@ public class LocalTaskRunner implements TaskRunner
)
{
this.toolboxFactory = toolboxFactory;
this.exec = exec;
this.exec = MoreExecutors.listeningDecorator(exec);
}
@LifecycleStop
@ -75,11 +71,10 @@ public class LocalTaskRunner implements TaskRunner
}
@Override
public void run(final Task task, final TaskCallback callback)
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
exec.submit(new LocalTaskRunnerRunnable(task, toolbox, callback));
return exec.submit(new LocalTaskRunnerCallable(task, toolbox));
}
@Override
@ -102,8 +97,8 @@ public class LocalTaskRunner implements TaskRunner
@Override
public TaskRunnerWorkItem apply(Runnable input)
{
if (input instanceof LocalTaskRunnerRunnable) {
return ((LocalTaskRunnerRunnable) input).getTaskRunnerWorkItem();
if (input instanceof LocalTaskRunnerCallable) {
return ((LocalTaskRunnerCallable) input).getTaskRunnerWorkItem();
}
return null;
}
@ -121,25 +116,23 @@ public class LocalTaskRunner implements TaskRunner
return Lists.newArrayList();
}
private static class LocalTaskRunnerRunnable implements Runnable
private static class LocalTaskRunnerCallable implements Callable<TaskStatus>
{
private final Task task;
private final TaskToolbox toolbox;
private final TaskCallback callback;
private final DateTime createdTime;
public LocalTaskRunnerRunnable(Task task, TaskToolbox toolbox, TaskCallback callback)
public LocalTaskRunnerCallable(Task task, TaskToolbox toolbox)
{
this.task = task;
this.toolbox = toolbox;
this.callback = callback;
this.createdTime = new DateTime();
}
@Override
public void run()
public TaskStatus call()
{
final long startTime = System.currentTimeMillis();
@ -175,7 +168,7 @@ public class LocalTaskRunner implements TaskRunner
}
try {
callback.notify(status.withDuration(System.currentTimeMillis() - startTime));
return status.withDuration(System.currentTimeMillis() - startTime);
} catch(Exception e) {
log.error(e, "Uncaught Exception during callback for task[%s]", task);
throw Throwables.propagate(e);
@ -186,7 +179,7 @@ public class LocalTaskRunner implements TaskRunner
{
return new TaskRunnerWorkItem(
task,
callback,
null,
null,
createdTime
);

View File

@ -27,12 +27,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
@ -212,18 +213,18 @@ public class RemoteTaskRunner implements TaskRunner
* A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
*
* @param task task to run
* @param callback callback to be called exactly once
*/
@Override
public void run(Task task, TaskCallback callback)
public ListenableFuture<TaskStatus> run(final Task task)
{
if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId());
}
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(
task, callback, retryPolicyFactory.makeRetryPolicy(), new DateTime()
task, SettableFuture.<TaskStatus>create(), retryPolicyFactory.makeRetryPolicy(), new DateTime()
);
addPendingTask(taskRunnerWorkItem);
return taskRunnerWorkItem.getResult();
}
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
@ -447,9 +448,9 @@ public class RemoteTaskRunner implements TaskRunner
if (taskStatus.isComplete()) {
if (taskRunnerWorkItem != null) {
final TaskCallback callback = taskRunnerWorkItem.getCallback();
if (callback != null) {
callback.notify(taskStatus);
final SettableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
result.set(taskStatus);
}
}

View File

@ -19,7 +19,8 @@
package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.common.TaskCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import java.util.Collection;
@ -31,13 +32,12 @@ import java.util.Collection;
public interface TaskRunner
{
/**
* Run a task with a particular context and call a callback. The callback may be called multiple times with RUNNING
* status, but should be called exactly once with a non-RUNNING status (e.g. SUCCESS, FAILED, CONTINUED...).
* Run a task. The returned status should be some kind of completed status.
*
* @param task task to run
* @param callback callback to be called exactly once
* @return task status, eventually
*/
public void run(Task task, TaskCallback callback);
public ListenableFuture<TaskStatus> run(Task task);
public Collection<TaskRunnerWorkItem> getRunningTasks();

View File

@ -20,8 +20,9 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
@ -32,7 +33,7 @@ import org.joda.time.DateTimeComparator;
public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{
private final Task task;
private final TaskCallback callback;
private final SettableFuture<TaskStatus> result;
private final RetryPolicy retryPolicy;
private final DateTime createdTime;
@ -40,13 +41,13 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem(
Task task,
TaskCallback callback,
SettableFuture<TaskStatus> result,
RetryPolicy retryPolicy,
DateTime createdTime
)
{
this.task = task;
this.callback = callback;
this.result = result;
this.retryPolicy = retryPolicy;
this.createdTime = createdTime;
}
@ -57,9 +58,9 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return task;
}
public TaskCallback getCallback()
public SettableFuture<TaskStatus> getResult()
{
return callback;
return result;
}
public RetryPolicy getRetryPolicy()
@ -91,46 +92,12 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime());
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskRunnerWorkItem that = (TaskRunnerWorkItem) o;
if (callback != null ? !callback.equals(that.callback) : that.callback != null) {
return false;
}
if (retryPolicy != null ? !retryPolicy.equals(that.retryPolicy) : that.retryPolicy != null) {
return false;
}
if (task != null ? !task.equals(that.task) : that.task != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = task != null ? task.hashCode() : 0;
result = 31 * result + (callback != null ? callback.hashCode() : 0);
result = 31 * result + (retryPolicy != null ? retryPolicy.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "TaskRunnerWorkItem{" +
"task=" + task +
", callback=" + callback +
", result=" + result +
", retryPolicy=" + retryPolicy +
", createdTime=" + createdTime +
'}';

View File

@ -20,9 +20,11 @@
package com.metamx.druid.merger.coordinator.exec;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task;
@ -138,15 +140,34 @@ public class TaskConsumer implements Runnable
}
// Hand off work to TaskRunner, with a callback
runner.run(
task, new TaskCallback()
final ListenableFuture<TaskStatus> status = runner.run(task);
Futures.addCallback(
status, new FutureCallback<TaskStatus>()
{
@Override
public void notify(final TaskStatus statusFromRunner)
public void onSuccess(final TaskStatus status)
{
log.info("Received %s status for task: %s", status.getStatusCode(), task);
handleStatus(status);
}
@Override
public void onFailure(Throwable t)
{
log.makeAlert(t, "Failed to run task")
.addData("task", task.getId())
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getImplicitLockInterval())
.emit();
handleStatus(TaskStatus.failure(task.getId()));
}
private void handleStatus(TaskStatus status)
{
try {
log.info("Received %s status for task: %s", statusFromRunner.getStatusCode(), task);
// If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after
// we check and before we commit the database transaction, but better than nothing.
if (shutdown) {
@ -154,34 +175,25 @@ public class TaskConsumer implements Runnable
return;
}
queue.notify(task, statusFromRunner);
queue.notify(task, status);
// Emit event and log, if the task is done
if (statusFromRunner.isComplete()) {
metricBuilder.setUser3(statusFromRunner.getStatusCode().toString());
emitter.emit(metricBuilder.build("indexer/time/run/millis", statusFromRunner.getDuration()));
if (statusFromRunner.isFailure()) {
log.makeAlert("Failed to index")
.addData("task", task.getId())
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getImplicitLockInterval())
.emit();
}
if (status.isComplete()) {
metricBuilder.setUser3(status.getStatusCode().toString());
emitter.emit(metricBuilder.build("indexer/time/run/millis", status.getDuration()));
log.info(
"Task %s: %s (%d run duration)",
statusFromRunner.getStatusCode(),
status.getStatusCode(),
task,
statusFromRunner.getDuration()
status.getDuration()
);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle task callback")
log.makeAlert(e, "Failed to handle task status")
.addData("task", task.getId())
.addData("statusCode", statusFromRunner.getStatusCode())
.addData("statusCode", status.getStatusCode())
.emit();
}
}

View File

@ -4,19 +4,20 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
@ -130,10 +131,7 @@ public class RemoteTaskRunnerTest
@Test
public void testRunNoExistingTask() throws Exception
{
remoteTaskRunner.run(
task1,
null
);
remoteTaskRunner.run(task1);
}
@Test
@ -146,11 +144,10 @@ public class RemoteTaskRunnerTest
task1.getSegments(),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.running(task1.getId())
),
null
)
);
try {
remoteTaskRunner.run(task1, null);
remoteTaskRunner.run(task1);
fail("ISE expected");
}
catch (ISE expected) {
@ -182,8 +179,7 @@ public class RemoteTaskRunnerTest
),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.success("foo")
),
null
)
);
EasyMock.verify(emitter);
}
@ -192,6 +188,8 @@ public class RemoteTaskRunnerTest
public void testRunWithCallback() throws Exception
{
final MutableBoolean callbackCalled = new MutableBoolean(false);
Futures.addCallback(
remoteTaskRunner.run(
new TestTask(
task1.getId(),
@ -199,14 +197,20 @@ public class RemoteTaskRunnerTest
task1.getSegments(),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.running(task1.getId())
),
new TaskCallback()
)
), new FutureCallback<TaskStatus>()
{
@Override
public void notify(TaskStatus status)
public void onSuccess(TaskStatus taskStatus)
{
callbackCalled.setValue(true);
}
@Override
public void onFailure(Throwable throwable)
{
// neg
}
}
);