RemoteTaskRunner: Fix issues leading to failing tests

This commit is contained in:
Gian Merlino 2013-12-12 13:40:23 -08:00
parent 81ccb37019
commit be25d51a2c
2 changed files with 73 additions and 32 deletions

View File

@ -32,6 +32,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE; import com.metamx.common.ISE;
@ -51,6 +53,7 @@ import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@ -72,7 +75,6 @@ import java.util.TreeSet;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -153,7 +155,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return; return;
} }
final CountDownLatch initialized = new CountDownLatch(1); final MutableInt waitingFor = new MutableInt(1);
final Object waitingForMonitor = new Object();
// Add listener for creation/deletion of workers // Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener( workerPathCache.getListenable().addListener(
@ -169,7 +172,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
event.getData().getData(), event.getData().getData(),
Worker.class Worker.class
); );
addWorker(worker); synchronized (waitingForMonitor) {
waitingFor.increment();
}
Futures.addCallback(
addWorker(worker),
new FutureCallback<ZkWorker>()
{
@Override
public void onSuccess(ZkWorker zkWorker)
{
synchronized (waitingForMonitor) {
waitingFor.decrement();
waitingForMonitor.notifyAll();
}
}
@Override
public void onFailure(Throwable throwable)
{
synchronized (waitingForMonitor) {
waitingFor.decrement();
waitingForMonitor.notifyAll();
}
}
}
);
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
worker = jsonMapper.readValue( worker = jsonMapper.readValue(
@ -179,16 +207,22 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
removeWorker(worker); removeWorker(worker);
break; break;
case INITIALIZED: case INITIALIZED:
initialized.countDown(); synchronized (waitingForMonitor) {
waitingFor.decrement();
waitingForMonitor.notifyAll();
}
default: default:
break; break;
} }
} }
} }
); );
workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
initialized.await(); synchronized (waitingForMonitor) {
while (waitingFor.intValue() > 0) {
waitingForMonitor.wait();
}
}
started = true; started = true;
} }
catch (Exception e) { catch (Exception e) {
@ -263,8 +297,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override @Override
public ListenableFuture<TaskStatus> run(final Task task) public ListenableFuture<TaskStatus> run(final Task task)
{ {
final RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;
if (runningTask != null) { if ((pendingTask = pendingTasks.get(task.getId())) != null) {
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
return pendingTask.getResult();
} else if ((runningTask = runningTasks.get(task.getId())) != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId()); ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) { if (zkWorker == null) {
log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId()); log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
@ -275,16 +312,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
taskComplete(runningTask, zkWorker, announcement.getTaskStatus()); taskComplete(runningTask, zkWorker, announcement.getTaskStatus());
} }
} }
return runningTask.getResult(); return runningTask.getResult();
} else if ((completeTask = completeTasks.get(task.getId())) != null) {
return completeTask.getResult();
} else {
return addPendingTask(task).getResult();
} }
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
if (pendingTask != null) {
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
return pendingTask.getResult();
}
return addPendingTask(task).getResult();
} }
/** /**
@ -573,14 +606,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
* The RemoteTaskRunner updates state according to these changes. * The RemoteTaskRunner updates state according to these changes.
* *
* @param worker contains metadata for a worker that has appeared in ZK * @param worker contains metadata for a worker that has appeared in ZK
* @return future that will contain a fully initialized worker
*/ */
private ZkWorker addWorker(final Worker worker) private ListenableFuture<ZkWorker> addWorker(final Worker worker)
{ {
log.info("Worker[%s] reportin' for duty!", worker.getHost()); log.info("Worker[%s] reportin' for duty!", worker.getHost());
try { try {
final String workerStatusPath = JOINER.join(zkPaths.getIndexerStatusPath(), worker.getHost()); final String workerStatusPath = JOINER.join(zkPaths.getIndexerStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create();
final ZkWorker zkWorker = new ZkWorker( final ZkWorker zkWorker = new ZkWorker(
worker, worker,
statusCache, statusCache,
@ -649,11 +684,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
break; break;
case INITIALIZED: case INITIALIZED:
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) != null) { if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
log.makeAlert("WTF?! Tried to add already-existing worker[%s]", worker.getHost()) retVal.set(zkWorker);
} else {
final String message = String.format(
"WTF?! Tried to add already-existing worker[%s]",
worker.getHost()
);
log.makeAlert(message)
.addData("workerHost", worker.getHost()) .addData("workerHost", worker.getHost())
.addData("workerIp", worker.getIp()) .addData("workerIp", worker.getIp())
.emit(); .emit();
retVal.setException(new IllegalStateException(message));
} }
runPendingTasks(); runPendingTasks();
} }
@ -669,7 +711,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
); );
zkWorker.start(); zkWorker.start();
return zkWorker; return retVal;
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -777,6 +819,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem"); Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
Preconditions.checkNotNull(zkWorker, "zkWorker"); Preconditions.checkNotNull(zkWorker, "zkWorker");
Preconditions.checkNotNull(taskStatus, "taskStatus"); Preconditions.checkNotNull(taskStatus, "taskStatus");
log.info(
"Worker[%s] completed task[%s] with status[%s]",
zkWorker.getWorker().getHost(),
taskStatus.getId(),
taskStatus.getStatusCode()
);
// Worker is done with this task // Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime()); zkWorker.setLastCompletedTaskTime(new DateTime());
// Move from running -> complete // Move from running -> complete

View File

@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -55,7 +55,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -303,16 +302,13 @@ public class RemoteTaskRunnerTest
doSetup(); doSetup();
Set<String> existingTasks = Sets.newHashSet(); final Set<String> existingTasks = Sets.newHashSet();
for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) { for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) {
existingTasks.addAll(zkWorker.getRunningTasks().keySet()); existingTasks.addAll(zkWorker.getRunningTasks().keySet());
} }
Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks);
Assert.assertTrue(existingTasks.size() == 2); final Set<String> runningTasks = Sets.newHashSet(
Assert.assertTrue(existingTasks.contains("first"));
Assert.assertTrue(existingTasks.contains("second"));
Set<String> runningTasks = Sets.newHashSet(
Iterables.transform( Iterables.transform(
remoteTaskRunner.getRunningTasks(), remoteTaskRunner.getRunningTasks(),
new Function<RemoteTaskRunnerWorkItem, String>() new Function<RemoteTaskRunnerWorkItem, String>()
@ -325,10 +321,7 @@ public class RemoteTaskRunnerTest
} }
) )
); );
Assert.assertEquals("runningTasks", ImmutableSet.of("first", "second"), runningTasks);
Assert.assertTrue(runningTasks.size() == 1);
Assert.assertTrue(runningTasks.contains("second"));
Assert.assertFalse(runningTasks.contains("first"));
} }
@Test @Test