From be25d51a2c6b4ae45f4edca9faaca2e0b3f97a24 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 12 Dec 2013 13:40:23 -0800 Subject: [PATCH] RemoteTaskRunner: Fix issues leading to failing tests --- .../indexing/overlord/RemoteTaskRunner.java | 88 ++++++++++++++----- .../overlord/RemoteTaskRunnerTest.java | 17 ++-- 2 files changed, 73 insertions(+), 32 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 87bccea7cc9..8b823444319 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -32,6 +32,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.ISE; @@ -51,6 +53,7 @@ import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.server.initialization.ZkPathsConfig; import io.druid.tasklogs.TaskLogStreamer; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -72,7 +75,6 @@ import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -153,7 +155,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return; } - final CountDownLatch initialized = new CountDownLatch(1); + final MutableInt waitingFor = new MutableInt(1); + final Object waitingForMonitor = new Object(); // Add listener for creation/deletion of workers workerPathCache.getListenable().addListener( @@ -169,7 +172,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer event.getData().getData(), Worker.class ); - addWorker(worker); + synchronized (waitingForMonitor) { + waitingFor.increment(); + } + Futures.addCallback( + addWorker(worker), + new FutureCallback() + { + @Override + public void onSuccess(ZkWorker zkWorker) + { + synchronized (waitingForMonitor) { + waitingFor.decrement(); + waitingForMonitor.notifyAll(); + } + } + + @Override + public void onFailure(Throwable throwable) + { + synchronized (waitingForMonitor) { + waitingFor.decrement(); + waitingForMonitor.notifyAll(); + } + } + } + ); break; case CHILD_REMOVED: worker = jsonMapper.readValue( @@ -179,16 +207,22 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer removeWorker(worker); break; case INITIALIZED: - initialized.countDown(); + synchronized (waitingForMonitor) { + waitingFor.decrement(); + waitingForMonitor.notifyAll(); + } default: break; } } } ); - workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - initialized.await(); + synchronized (waitingForMonitor) { + while (waitingFor.intValue() > 0) { + waitingForMonitor.wait(); + } + } started = true; } catch (Exception e) { @@ -263,8 +297,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer @Override public ListenableFuture run(final Task task) { - final RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); - if (runningTask != null) { + final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask; + if ((pendingTask = pendingTasks.get(task.getId())) != null) { + log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId()); + return pendingTask.getResult(); + } else if ((runningTask = runningTasks.get(task.getId())) != null) { ZkWorker zkWorker = findWorkerRunningTask(task.getId()); if (zkWorker == null) { log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId()); @@ -275,16 +312,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer taskComplete(runningTask, zkWorker, announcement.getTaskStatus()); } } - return runningTask.getResult(); + } else if ((completeTask = completeTasks.get(task.getId())) != null) { + return completeTask.getResult(); + } else { + return addPendingTask(task).getResult(); } - - RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId()); - if (pendingTask != null) { - log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId()); - return pendingTask.getResult(); - } - return addPendingTask(task).getResult(); } /** @@ -573,14 +606,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer * The RemoteTaskRunner updates state according to these changes. * * @param worker contains metadata for a worker that has appeared in ZK + * @return future that will contain a fully initialized worker */ - private ZkWorker addWorker(final Worker worker) + private ListenableFuture addWorker(final Worker worker) { log.info("Worker[%s] reportin' for duty!", worker.getHost()); try { final String workerStatusPath = JOINER.join(zkPaths.getIndexerStatusPath(), worker.getHost()); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); + final SettableFuture retVal = SettableFuture.create(); final ZkWorker zkWorker = new ZkWorker( worker, statusCache, @@ -649,11 +684,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } break; case INITIALIZED: - if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) != null) { - log.makeAlert("WTF?! Tried to add already-existing worker[%s]", worker.getHost()) + if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) { + retVal.set(zkWorker); + } else { + final String message = String.format( + "WTF?! Tried to add already-existing worker[%s]", + worker.getHost() + ); + log.makeAlert(message) .addData("workerHost", worker.getHost()) .addData("workerIp", worker.getIp()) .emit(); + retVal.setException(new IllegalStateException(message)); } runPendingTasks(); } @@ -669,7 +711,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } ); zkWorker.start(); - return zkWorker; + return retVal; } catch (Exception e) { throw Throwables.propagate(e); @@ -777,6 +819,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem"); Preconditions.checkNotNull(zkWorker, "zkWorker"); Preconditions.checkNotNull(taskStatus, "taskStatus"); + log.info( + "Worker[%s] completed task[%s] with status[%s]", + zkWorker.getWorker().getHost(), + taskStatus.getId(), + taskStatus.getStatusCode() + ); // Worker is done with this task zkWorker.setLastCompletedTaskTime(new DateTime()); // Move from running -> complete diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index fc0b9fdb2e2..fcf9715fe62 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.emitter.EmittingLogger; @@ -55,7 +55,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.Arrays; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; @@ -303,16 +302,13 @@ public class RemoteTaskRunnerTest doSetup(); - Set existingTasks = Sets.newHashSet(); + final Set existingTasks = Sets.newHashSet(); for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) { existingTasks.addAll(zkWorker.getRunningTasks().keySet()); } + Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks); - Assert.assertTrue(existingTasks.size() == 2); - Assert.assertTrue(existingTasks.contains("first")); - Assert.assertTrue(existingTasks.contains("second")); - - Set runningTasks = Sets.newHashSet( + final Set runningTasks = Sets.newHashSet( Iterables.transform( remoteTaskRunner.getRunningTasks(), new Function() @@ -325,10 +321,7 @@ public class RemoteTaskRunnerTest } ) ); - - Assert.assertTrue(runningTasks.size() == 1); - Assert.assertTrue(runningTasks.contains("second")); - Assert.assertFalse(runningTasks.contains("first")); + Assert.assertEquals("runningTasks", ImmutableSet.of("first", "second"), runningTasks); } @Test