diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index ca052d320aa..c3e26bd5fac 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -481,10 +481,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return null; } - public boolean isWorkerRunningTask(Worker worker, String taskId) + public boolean isWorkerRunningTask(ZkWorker worker, String taskId) { - ZkWorker zkWorker = zkWorkers.get(worker.getHost()); - return (zkWorker != null && zkWorker.isRunningTask(taskId)); + return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId); } /** @@ -869,7 +868,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // on a worker - this avoids overflowing a worker with tasks Stopwatch timeoutStopwatch = Stopwatch.createStarted(); - while (!isWorkerRunningTask(theZkWorker.getWorker(), task.getId())) { + while (!isWorkerRunningTask(theZkWorker, task.getId())) { final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); statusLock.wait(waitMs); long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS); diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java index 040dbf3500e..5d470102f13 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java @@ -27,16 +27,15 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; -import io.druid.java.util.common.concurrent.Execs; import io.druid.indexer.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunnerListener; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index 435a5b4a6fc..7bcdff1530a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -21,6 +21,7 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.metamx.http.client.HttpClient; @@ -170,10 +171,17 @@ public class RemoteTaskRunnerTestUtils { cf.delete().forPath(joiner.join(tasksPath, workerId, task.getId())); + final String taskStatusPath = joiner.join(statusPath, workerId, task.getId()); TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()), DUMMY_LOCATION); cf.create() .creatingParentsIfNeeded() - .forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); + .forPath(taskStatusPath, jsonMapper.writeValueAsBytes(taskAnnouncement)); + + Preconditions.checkNotNull( + cf.checkExists().forPath(taskStatusPath), + "Failed to write status on [%s]", + taskStatusPath + ); } void mockWorkerCompleteSuccessfulTask(final String workerId, final Task task) throws Exception