diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java deleted file mode 100644 index edf3c9bae86..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merger.common; - -public interface TaskCallback -{ - public void notify(TaskStatus status); -} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java index b8a40cdb0b4..60d1def7f23 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java @@ -21,28 +21,24 @@ package com.metamx.druid.merger.coordinator; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; -import com.metamx.druid.merger.common.RetryPolicy; -import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.task.Task; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; -import org.mortbay.thread.ThreadPool; -import javax.annotation.Nullable; import java.io.File; import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -53,7 +49,7 @@ import java.util.concurrent.ThreadPoolExecutor; public class LocalTaskRunner implements TaskRunner { private final TaskToolboxFactory toolboxFactory; - private final ExecutorService exec; + private final ListeningExecutorService exec; private final Set runningItems = new ConcurrentSkipListSet(); @@ -65,7 +61,7 @@ public class LocalTaskRunner implements TaskRunner ) { this.toolboxFactory = toolboxFactory; - this.exec = exec; + this.exec = MoreExecutors.listeningDecorator(exec); } @LifecycleStop @@ -75,11 +71,10 @@ public class LocalTaskRunner implements TaskRunner } @Override - public void run(final Task task, final TaskCallback callback) + public ListenableFuture run(final Task task) { final TaskToolbox toolbox = toolboxFactory.build(task); - - exec.submit(new LocalTaskRunnerRunnable(task, toolbox, callback)); + return exec.submit(new LocalTaskRunnerCallable(task, toolbox)); } @Override @@ -102,8 +97,8 @@ public class LocalTaskRunner implements TaskRunner @Override public TaskRunnerWorkItem apply(Runnable input) { - if (input instanceof LocalTaskRunnerRunnable) { - return ((LocalTaskRunnerRunnable) input).getTaskRunnerWorkItem(); + if (input instanceof LocalTaskRunnerCallable) { + return ((LocalTaskRunnerCallable) input).getTaskRunnerWorkItem(); } return null; } @@ -121,25 +116,23 @@ public class LocalTaskRunner implements TaskRunner return Lists.newArrayList(); } - private static class LocalTaskRunnerRunnable implements Runnable + private static class LocalTaskRunnerCallable implements Callable { private final Task task; private final TaskToolbox toolbox; - private final TaskCallback callback; private final DateTime createdTime; - public LocalTaskRunnerRunnable(Task task, TaskToolbox toolbox, TaskCallback callback) + public LocalTaskRunnerCallable(Task task, TaskToolbox toolbox) { this.task = task; this.toolbox = toolbox; - this.callback = callback; this.createdTime = new DateTime(); } @Override - public void run() + public TaskStatus call() { final long startTime = System.currentTimeMillis(); @@ -175,7 +168,7 @@ public class LocalTaskRunner implements TaskRunner } try { - callback.notify(status.withDuration(System.currentTimeMillis() - startTime)); + return status.withDuration(System.currentTimeMillis() - startTime); } catch(Exception e) { log.error(e, "Uncaught Exception during callback for task[%s]", task); throw Throwables.propagate(e); @@ -186,7 +179,7 @@ public class LocalTaskRunner implements TaskRunner { return new TaskRunnerWorkItem( task, - callback, + null, null, createdTime ); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 957124b1afe..ac7c05f7b01 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -27,12 +27,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.merger.common.RetryPolicyFactory; -import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; @@ -211,19 +212,19 @@ public class RemoteTaskRunner implements TaskRunner /** * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task. * - * @param task task to run - * @param callback callback to be called exactly once + * @param task task to run */ @Override - public void run(Task task, TaskCallback callback) + public ListenableFuture run(final Task task) { if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) { throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId()); } TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem( - task, callback, retryPolicyFactory.makeRetryPolicy(), new DateTime() + task, SettableFuture.create(), retryPolicyFactory.makeRetryPolicy(), new DateTime() ); addPendingTask(taskRunnerWorkItem); + return taskRunnerWorkItem.getResult(); } private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem) @@ -447,9 +448,9 @@ public class RemoteTaskRunner implements TaskRunner if (taskStatus.isComplete()) { if (taskRunnerWorkItem != null) { - final TaskCallback callback = taskRunnerWorkItem.getCallback(); - if (callback != null) { - callback.notify(taskStatus); + final SettableFuture result = taskRunnerWorkItem.getResult(); + if (result != null) { + result.set(taskStatus); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java index 756c4b793fd..4b5608448d0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java @@ -19,7 +19,8 @@ package com.metamx.druid.merger.coordinator; -import com.metamx.druid.merger.common.TaskCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; import java.util.Collection; @@ -31,13 +32,12 @@ import java.util.Collection; public interface TaskRunner { /** - * Run a task with a particular context and call a callback. The callback may be called multiple times with RUNNING - * status, but should be called exactly once with a non-RUNNING status (e.g. SUCCESS, FAILED, CONTINUED...). + * Run a task. The returned status should be some kind of completed status. * * @param task task to run - * @param callback callback to be called exactly once + * @return task status, eventually */ - public void run(Task task, TaskCallback callback); + public ListenableFuture run(Task task); public Collection getRunningTasks(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java index d850c93d119..d799c23e303 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java @@ -20,8 +20,9 @@ package com.metamx.druid.merger.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.util.concurrent.SettableFuture; import com.metamx.druid.merger.common.RetryPolicy; -import com.metamx.druid.merger.common.TaskCallback; +import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -32,7 +33,7 @@ import org.joda.time.DateTimeComparator; public class TaskRunnerWorkItem implements Comparable { private final Task task; - private final TaskCallback callback; + private final SettableFuture result; private final RetryPolicy retryPolicy; private final DateTime createdTime; @@ -40,13 +41,13 @@ public class TaskRunnerWorkItem implements Comparable public TaskRunnerWorkItem( Task task, - TaskCallback callback, + SettableFuture result, RetryPolicy retryPolicy, DateTime createdTime ) { this.task = task; - this.callback = callback; + this.result = result; this.retryPolicy = retryPolicy; this.createdTime = createdTime; } @@ -57,9 +58,9 @@ public class TaskRunnerWorkItem implements Comparable return task; } - public TaskCallback getCallback() + public SettableFuture getResult() { - return callback; + return result; } public RetryPolicy getRetryPolicy() @@ -91,46 +92,12 @@ public class TaskRunnerWorkItem implements Comparable return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime()); } - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - TaskRunnerWorkItem that = (TaskRunnerWorkItem) o; - - if (callback != null ? !callback.equals(that.callback) : that.callback != null) { - return false; - } - if (retryPolicy != null ? !retryPolicy.equals(that.retryPolicy) : that.retryPolicy != null) { - return false; - } - if (task != null ? !task.equals(that.task) : that.task != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = task != null ? task.hashCode() : 0; - result = 31 * result + (callback != null ? callback.hashCode() : 0); - result = 31 * result + (retryPolicy != null ? retryPolicy.hashCode() : 0); - return result; - } - @Override public String toString() { return "TaskRunnerWorkItem{" + "task=" + task + - ", callback=" + callback + + ", result=" + result + ", retryPolicy=" + retryPolicy + ", createdTime=" + createdTime + '}'; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java index 8289229aaf4..dc39f6e3282 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java @@ -20,9 +20,11 @@ package com.metamx.druid.merger.coordinator.exec; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.actions.TaskActionClientFactory; import com.metamx.druid.merger.common.task.Task; @@ -138,15 +140,34 @@ public class TaskConsumer implements Runnable } // Hand off work to TaskRunner, with a callback - runner.run( - task, new TaskCallback() + final ListenableFuture status = runner.run(task); + + Futures.addCallback( + status, new FutureCallback() { @Override - public void notify(final TaskStatus statusFromRunner) + public void onSuccess(final TaskStatus status) + { + log.info("Received %s status for task: %s", status.getStatusCode(), task); + handleStatus(status); + } + + @Override + public void onFailure(Throwable t) + { + log.makeAlert(t, "Failed to run task") + .addData("task", task.getId()) + .addData("type", task.getType()) + .addData("dataSource", task.getDataSource()) + .addData("interval", task.getImplicitLockInterval()) + .emit(); + + handleStatus(TaskStatus.failure(task.getId())); + } + + private void handleStatus(TaskStatus status) { try { - log.info("Received %s status for task: %s", statusFromRunner.getStatusCode(), task); - // If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after // we check and before we commit the database transaction, but better than nothing. if (shutdown) { @@ -154,34 +175,25 @@ public class TaskConsumer implements Runnable return; } - queue.notify(task, statusFromRunner); + queue.notify(task, status); // Emit event and log, if the task is done - if (statusFromRunner.isComplete()) { - metricBuilder.setUser3(statusFromRunner.getStatusCode().toString()); - emitter.emit(metricBuilder.build("indexer/time/run/millis", statusFromRunner.getDuration())); - - if (statusFromRunner.isFailure()) { - log.makeAlert("Failed to index") - .addData("task", task.getId()) - .addData("type", task.getType()) - .addData("dataSource", task.getDataSource()) - .addData("interval", task.getImplicitLockInterval()) - .emit(); - } + if (status.isComplete()) { + metricBuilder.setUser3(status.getStatusCode().toString()); + emitter.emit(metricBuilder.build("indexer/time/run/millis", status.getDuration())); log.info( "Task %s: %s (%d run duration)", - statusFromRunner.getStatusCode(), + status.getStatusCode(), task, - statusFromRunner.getDuration() + status.getDuration() ); } } catch (Exception e) { - log.makeAlert(e, "Failed to handle task callback") + log.makeAlert(e, "Failed to handle task status") .addData("task", task.getId()) - .addData("statusCode", statusFromRunner.getStatusCode()) + .addData("statusCode", status.getStatusCode()) .emit(); } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index a3980820268..81e073519ad 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -4,19 +4,20 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.metamx.common.ISE; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.TestTask; import com.metamx.druid.merger.common.RetryPolicyFactory; -import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.config.IndexerZkConfig; +import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; -import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; @@ -130,10 +131,7 @@ public class RemoteTaskRunnerTest @Test public void testRunNoExistingTask() throws Exception { - remoteTaskRunner.run( - task1, - null - ); + remoteTaskRunner.run(task1); } @Test @@ -146,11 +144,10 @@ public class RemoteTaskRunnerTest task1.getSegments(), Lists.newArrayList(), TaskStatus.running(task1.getId()) - ), - null + ) ); try { - remoteTaskRunner.run(task1, null); + remoteTaskRunner.run(task1); fail("ISE expected"); } catch (ISE expected) { @@ -182,8 +179,7 @@ public class RemoteTaskRunnerTest ), Lists.newArrayList(), TaskStatus.success("foo") - ), - null + ) ); EasyMock.verify(emitter); } @@ -192,22 +188,30 @@ public class RemoteTaskRunnerTest public void testRunWithCallback() throws Exception { final MutableBoolean callbackCalled = new MutableBoolean(false); - remoteTaskRunner.run( - new TestTask( - task1.getId(), - task1.getDataSource(), - task1.getSegments(), - Lists.newArrayList(), - TaskStatus.running(task1.getId()) - ), - new TaskCallback() - { - @Override - public void notify(TaskStatus status) - { - callbackCalled.setValue(true); - } - } + + Futures.addCallback( + remoteTaskRunner.run( + new TestTask( + task1.getId(), + task1.getDataSource(), + task1.getSegments(), + Lists.newArrayList(), + TaskStatus.running(task1.getId()) + ) + ), new FutureCallback() + { + @Override + public void onSuccess(TaskStatus taskStatus) + { + callbackCalled.setValue(true); + } + + @Override + public void onFailure(Throwable throwable) + { + // neg + } + } ); // Really don't like this way of waiting for the task to appear