diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index d95b9e57429..fce401c6641 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -109,7 +109,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer tasks.put( task.getId(), new ForkingTaskRunnerWorkItem( - task, + task.getId(), exec.submit( new Callable() { @@ -427,11 +427,11 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer private volatile ProcessHolder processHolder = null; private ForkingTaskRunnerWorkItem( - Task task, + String taskId, ListenableFuture statusFuture ) { - super(task, statusFuture); + super(taskId, statusFuture); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index d42f32216a8..b6441049b2f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -52,7 +52,6 @@ import io.druid.indexing.worker.Worker; import io.druid.server.initialization.ZkPathsConfig; import io.druid.tasklogs.TaskLogStreamer; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; @@ -72,6 +71,8 @@ import java.util.Map; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -108,11 +109,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final HttpClient httpClient; // all workers that exist in ZK - private final Map zkWorkers = new ConcurrentHashMap(); + private final ConcurrentMap zkWorkers = new ConcurrentHashMap<>(); + // payloads of pending tasks, which we remember just long enough to assign to workers + private final ConcurrentMap pendingTaskPayloads = new ConcurrentHashMap<>(); + // tasks that have not yet been assigned to a worker + private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue(); // all tasks that have been assigned to a worker private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue(); - // tasks that have not yet run - private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue(); // tasks that are complete but not cleaned up yet private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue(); @@ -150,6 +153,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return; } + final CountDownLatch initialized = new CountDownLatch(1); + // Add listener for creation/deletion of workers workerPathCache.getListenable().addListener( new PathChildrenCacheListener() @@ -164,7 +169,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer event.getData().getData(), Worker.class ); - addWorker(worker, PathChildrenCache.StartMode.NORMAL); + addWorker(worker); break; case CHILD_REMOVED: worker = jsonMapper.readValue( @@ -173,22 +178,17 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer ); removeWorker(worker); break; + case INITIALIZED: + initialized.countDown(); default: break; } } } ); - workerPathCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - for (ChildData childData : workerPathCache.getCurrentData()) { - final Worker worker = jsonMapper.readValue( - childData.getData(), - Worker.class - ); - addWorker(worker, PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - } + workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + initialized.await(); started = true; } catch (Exception e) { @@ -249,11 +249,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return null; } - public boolean isWorkerRunningTask(Worker worker, Task task) + public boolean isWorkerRunningTask(Worker worker, String taskId) { ZkWorker zkWorker = zkWorkers.get(worker.getHost()); - - return (zkWorker != null && zkWorker.isRunningTask(task.getId())); + return (zkWorker != null && zkWorker.isRunningTask(taskId)); } /** @@ -264,7 +263,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer @Override public ListenableFuture run(final Task task) { - RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); + final RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); if (runningTask != null) { ZkWorker zkWorker = findWorkerRunningTask(task.getId()); if (zkWorker == null) { @@ -285,14 +284,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId()); return pendingTask.getResult(); } - - RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( - task, - SettableFuture.create(), - null - ); - addPendingTask(taskRunnerWorkItem); - return taskRunnerWorkItem.getResult(); + return addPendingTask(task).getResult(); } /** @@ -391,12 +383,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer /** * Adds a task to the pending queue */ - private void addPendingTask(final RemoteTaskRunnerWorkItem taskRunnerWorkItem) + private RemoteTaskRunnerWorkItem addPendingTask(final Task task) { - log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId()); - - pendingTasks.put(taskRunnerWorkItem.getTask().getId(), taskRunnerWorkItem); + log.info("Added pending task %s", task.getId()); + final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( + task.getId(), + SettableFuture.create(), + null + ); + pendingTaskPayloads.put(task.getId(), task); + pendingTasks.put(task.getId(), taskRunnerWorkItem); runPendingTasks(); + return taskRunnerWorkItem; } /** @@ -413,11 +411,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer public Void call() throws Exception { try { - // make a copy of the pending tasks because assignTask may delete tasks from pending and move them + // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them // into running status List copy = Lists.newArrayList(pendingTasks.values()); - for (RemoteTaskRunnerWorkItem taskWrapper : copy) { - assignTask(taskWrapper); + for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) { + String taskId = taskRunnerWorkItem.getTaskId(); + if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) { + pendingTaskPayloads.remove(taskId); + } } } catch (Exception e) { @@ -464,26 +465,34 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer * needs to bootstrap after a restart. * * @param taskRunnerWorkItem - the task to assign + * @return true iff the task is now assigned */ - private void assignTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem) + private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) { try { - final String taskId = taskRunnerWorkItem.getTask().getId(); + Preconditions.checkNotNull(task, "task"); + Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem"); + Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id"); - if (runningTasks.containsKey(taskId) || findWorkerRunningTask(taskId) != null) { - log.info("Task[%s] already running.", taskId); + if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) { + log.info("Task[%s] already running.", task.getId()); + return true; } else { // Nothing running this task, announce it in ZK for a worker to run it - ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); + ZkWorker zkWorker = findWorkerForTask(task); if (zkWorker != null) { - announceTask(zkWorker, taskRunnerWorkItem); + announceTask(task, zkWorker, taskRunnerWorkItem); + return true; + } else { + return false; } } } catch (Exception e) { log.makeAlert(e, "Exception while trying to run task") - .addData("taskId", taskRunnerWorkItem.getTask().getId()) + .addData("taskId", taskRunnerWorkItem.getTaskId()) .emit(); + return false; } } @@ -494,9 +503,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer * @param theZkWorker The worker the task is assigned to * @param taskRunnerWorkItem The task to be assigned */ - private void announceTask(ZkWorker theZkWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception + private void announceTask( + final Task task, + final ZkWorker theZkWorker, + final RemoteTaskRunnerWorkItem taskRunnerWorkItem + ) throws Exception { - final Task task = taskRunnerWorkItem.getTask(); + Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id"); final Worker theWorker = theZkWorker.getWorker(); log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId()); @@ -533,7 +546,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer Stopwatch timeoutStopwatch = new Stopwatch(); timeoutStopwatch.start(); synchronized (statusLock) { - while (!isWorkerRunningTask(theWorker, task)) { + while (!isWorkerRunningTask(theWorker, task.getId())) { final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); statusLock.wait(waitMs); long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS); @@ -558,9 +571,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer * the worker. Status changes indicate the creation or completion of a task. * The RemoteTaskRunner updates state according to these changes. * - * @param worker - contains metadata for a worker that has appeared in ZK + * @param worker contains metadata for a worker that has appeared in ZK */ - private ZkWorker addWorker(final Worker worker, PathChildrenCache.StartMode startMode) + private ZkWorker addWorker(final Worker worker) { log.info("Worker[%s] reportin' for duty!", worker.getHost()); @@ -580,8 +593,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - String taskId; - RemoteTaskRunnerWorkItem taskRunnerWorkItem; + final String taskId; + final RemoteTaskRunnerWorkItem taskRunnerWorkItem; synchronized (statusLock) { try { switch (event.getType()) { @@ -600,15 +613,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer ); // Synchronizing state with ZK - statusLock.notify(); + statusLock.notifyAll(); - taskRunnerWorkItem = runningTasks.get(taskId); - if (taskRunnerWorkItem == null) { + final RemoteTaskRunnerWorkItem tmp; + if ((tmp = runningTasks.get(taskId)) != null) { + taskRunnerWorkItem = tmp; + } else { log.warn( - "WTF?! Worker[%s] announcing a status for a task I didn't know about: %s", + "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", zkWorker.getWorker().getHost(), taskId ); + taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( + taskId, + SettableFuture.create(), + zkWorker.getWorker() + ); + runningTasks.put(taskId, taskRunnerWorkItem); } if (taskStatus.isComplete()) { @@ -621,11 +642,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer taskRunnerWorkItem = runningTasks.remove(taskId); if (taskRunnerWorkItem != null) { log.info("Task[%s] just disappeared!", taskId); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); + taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId())); } else { log.info("Task[%s] went bye bye.", taskId); } break; + case INITIALIZED: + if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) != null) { + log.makeAlert("WTF?! Tried to add already-existing worker[%s]", worker.getHost()) + .addData("workerHost", worker.getHost()) + .addData("workerIp", worker.getIp()) + .emit(); + } + runPendingTasks(); } } catch (Exception e) { @@ -638,12 +667,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } } ); - - zkWorker.start(startMode); - zkWorkers.put(worker.getHost(), zkWorker); - - runPendingTasks(); - + zkWorker.start(); return zkWorker; } catch (Exception e) { @@ -690,7 +714,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } log.info("Failing task[%s]", assignedTask); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); + taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId())); } else { log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask); } @@ -749,19 +773,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer TaskStatus taskStatus ) { + Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem"); + Preconditions.checkNotNull(zkWorker, "zkWorker"); + Preconditions.checkNotNull(taskStatus, "taskStatus"); // Worker is done with this task zkWorker.setLastCompletedTaskTime(new DateTime()); // Move from running -> complete - if (taskRunnerWorkItem != null) { - completeTasks.put(taskStatus.getId(), taskRunnerWorkItem); - } + completeTasks.put(taskStatus.getId(), taskRunnerWorkItem); runningTasks.remove(taskStatus.getId()); // Notify interested parties - if (taskRunnerWorkItem != null) { - final ListenableFuture result = taskRunnerWorkItem.getResult(); - if (result != null) { - ((SettableFuture) result).set(taskStatus); - } + final ListenableFuture result = taskRunnerWorkItem.getResult(); + if (result != null) { + ((SettableFuture) result).set(taskStatus); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java index 1c1dc7a17a9..76d373a049a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java @@ -21,7 +21,6 @@ package io.druid.indexing.overlord; import com.google.common.util.concurrent.SettableFuture; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.task.Task; import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; @@ -33,25 +32,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem private final Worker worker; public RemoteTaskRunnerWorkItem( - Task task, + String taskId, SettableFuture result, Worker worker ) { - super(task, result); + super(taskId, result); this.result = result; this.worker = worker; } public RemoteTaskRunnerWorkItem( - Task task, + String taskId, SettableFuture result, DateTime createdTime, DateTime queueInsertionTime, Worker worker ) { - super(task, result, createdTime, queueInsertionTime); + super(taskId, result, createdTime, queueInsertionTime); this.result = result; this.worker = worker; } @@ -69,11 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem @Override public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) { - return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker); + return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), time, worker); } public RemoteTaskRunnerWorkItem withWorker(Worker theWorker) { - return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), theWorker); + return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), getQueueInsertionTime(), theWorker); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 3f6708bd451..d486f37c0fc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -393,20 +393,6 @@ public class TaskLockbox } } - /** - * Removes all locks from this lockbox. - */ - public void clear() - { - giant.lock(); - - try { - running.clear(); - } finally { - giant.unlock(); - } - } - /** * Return the currently-active lock posses for some task. * diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index b2d1eedf049..96183d5ae64 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -117,6 +117,14 @@ public class TaskMaster .emit(); } leaderLifecycle.addManagedInstance(taskRunner); + if (taskRunner instanceof RemoteTaskRunner) { + final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle); + resourceManagementScheduler = managementSchedulerFactory.build( + (RemoteTaskRunner) taskRunner, + executorFactory + ); + leaderLifecycle.addManagedInstance(resourceManagementScheduler); + } leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addHandler( new Lifecycle.Handler() @@ -134,14 +142,6 @@ public class TaskMaster } } ); - if (taskRunner instanceof RemoteTaskRunner) { - final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle); - resourceManagementScheduler = managementSchedulerFactory.build( - (RemoteTaskRunner) taskRunner, - executorFactory - ); - leaderLifecycle.addManagedInstance(resourceManagementScheduler); - } try { leaderLifecycle.start(); leading = true; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index f7b3c86874e..77e4b26e372 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -221,7 +221,7 @@ public class TaskQueue // Task futures available from the taskRunner final Map> runnerTaskFutures = Maps.newHashMap(); for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { - runnerTaskFutures.put(workItem.getTask().getId(), workItem.getResult()); + runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } // Attain futures for all active tasks (assuming they are ready to run). for (final Task task : tasks) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index 4d4cac6ef70..a78faa24d03 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -19,11 +19,9 @@ package io.druid.indexing.overlord; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ComparisonChain; import com.google.common.util.concurrent.ListenableFuture; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -32,36 +30,35 @@ import org.joda.time.DateTimeComparator; */ public class TaskRunnerWorkItem implements Comparable { - private final Task task; + private final String taskId; private final ListenableFuture result; private final DateTime createdTime; private final DateTime queueInsertionTime; public TaskRunnerWorkItem( - Task task, + String taskId, ListenableFuture result ) { - this(task, result, new DateTime(), new DateTime()); + this(taskId, result, new DateTime(), new DateTime()); } public TaskRunnerWorkItem( - Task task, + String taskId, ListenableFuture result, DateTime createdTime, DateTime queueInsertionTime ) { - this.task = task; + this.taskId = taskId; this.result = result; this.createdTime = createdTime; this.queueInsertionTime = queueInsertionTime; } - @JsonProperty - public Task getTask() + public String getTaskId() { - return task; + return taskId; } public ListenableFuture getResult() @@ -69,13 +66,11 @@ public class TaskRunnerWorkItem implements Comparable return result; } - @JsonProperty public DateTime getCreatedTime() { return createdTime; } - @JsonProperty public DateTime getQueueInsertionTime() { return queueInsertionTime; @@ -83,7 +78,7 @@ public class TaskRunnerWorkItem implements Comparable public TaskRunnerWorkItem withQueueInsertionTime(DateTime time) { - return new TaskRunnerWorkItem(task, result, createdTime, time); + return new TaskRunnerWorkItem(taskId, result, createdTime, time); } @Override @@ -91,7 +86,7 @@ public class TaskRunnerWorkItem implements Comparable { return ComparisonChain.start() .compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance()) - .compare(task.getId(), taskRunnerWorkItem.getTask().getId()) + .compare(taskId, taskRunnerWorkItem.getTaskId()) .result(); } @@ -99,9 +94,10 @@ public class TaskRunnerWorkItem implements Comparable public String toString() { return "TaskRunnerWorkItem{" + - "task=" + task + + "taskId='" + taskId + '\'' + ", result=" + result + ", createdTime=" + createdTime + + ", queueInsertionTime=" + queueInsertionTime + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 14e3e94711a..a4a8db0d1a3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -19,7 +19,6 @@ package io.druid.indexing.overlord; -import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -46,7 +45,6 @@ import org.joda.time.Interval; import java.io.File; import java.util.Collection; -import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; @@ -58,7 +56,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker { private final TaskToolboxFactory toolboxFactory; private final ListeningExecutorService exec; - private final Set runningItems = new ConcurrentSkipListSet(); + private final Set runningItems = new ConcurrentSkipListSet<>(); private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class); @@ -82,8 +80,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker { final TaskToolbox toolbox = toolboxFactory.build(task); final ListenableFuture statusFuture = exec.submit(new ThreadPoolTaskRunnerCallable(task, toolbox)); - - final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture); + final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, statusFuture); runningItems.add(taskRunnerWorkItem); Futures.addCallback( statusFuture, new FutureCallback() @@ -109,7 +106,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker public void shutdown(final String taskid) { for (final TaskRunnerWorkItem runningItem : runningItems) { - if (runningItem.getTask().getId().equals(taskid)) { + if (runningItem.getTaskId().equals(taskid)) { runningItem.getResult().cancel(true); } } @@ -118,7 +115,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker @Override public Collection getRunningTasks() { - return ImmutableList.copyOf(runningItems); + return ImmutableList.copyOf(runningItems); } @Override @@ -130,7 +127,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker @Override public Collection getKnownTasks() { - return ImmutableList.copyOf(runningItems); + return ImmutableList.copyOf(runningItems); } @Override @@ -155,18 +152,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker { QueryRunner queryRunner = null; - final List runningTasks = Lists.transform( - ImmutableList.copyOf(getRunningTasks()), new Function() - { - @Override - public Task apply(TaskRunnerWorkItem o) - { - return o.getTask(); - } - } - ); - - for (final Task task : runningTasks) { + for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { + final Task task = taskRunnerWorkItem.getTask(); if (task.getDataSource().equals(query.getDataSource())) { final QueryRunner taskQueryRunner = task.getQueryRunner(query); @@ -185,6 +172,25 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker return queryRunner == null ? new NoopQueryRunner() : queryRunner; } + private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem + { + private final Task task; + + private ThreadPoolTaskRunnerWorkItem( + Task task, + ListenableFuture result + ) + { + super(task.getId(), result); + this.task = task; + } + + public Task getTask() + { + return task; + } + } + private static class ThreadPoolTaskRunnerCallable implements Callable { private final Task task; @@ -242,10 +248,5 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker throw Throwables.propagate(e); } } - - public TaskRunnerWorkItem getTaskRunnerWorkItem() - { - return new TaskRunnerWorkItem(task, null); - } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java index edb0c3df685..335b5fa583d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java @@ -71,9 +71,9 @@ public class ZkWorker implements Closeable }; } - public void start(PathChildrenCache.StartMode startMode) throws Exception + public void start() throws Exception { - statusCache.start(startMode); + statusCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } public void addListener(PathChildrenCacheListener listener) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 7ca0bfed2c9..d3d58bef03d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -70,9 +70,7 @@ public class OverlordResource public Map apply(TaskRunnerWorkItem input) { return new ImmutableMap.Builder() - .put("id", input.getTask().getId()) - .put("dataSource", input.getTask().getDataSource()) - .put("nodeType", input.getTask().getNodeType() == null ? "" : input.getTask().getNodeType()) + .put("id", input.getTaskId()) .put("createdTime", input.getCreatedTime()) .put("queueInsertionTime", input.getQueueInsertionTime()) .build(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 9ee11d418b4..fc0b9fdb2e2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -219,7 +219,7 @@ public class RemoteTaskRunnerTest ) ); - Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2")); } @Test @@ -266,7 +266,7 @@ public class RemoteTaskRunnerTest ) ); - Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2")); } @Test @@ -280,7 +280,7 @@ public class RemoteTaskRunnerTest Assert.assertTrue(workerRunningTask(task.getId())); - Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task")); + Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals("task")); cf.delete().forPath(joiner.join(statusPath, task.getId())); @@ -320,7 +320,7 @@ public class RemoteTaskRunnerTest @Override public String apply(RemoteTaskRunnerWorkItem input) { - return input.getTask().getId(); + return input.getTaskId(); } } ) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java index 02ac9a21778..1f3f4a44eee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java @@ -111,7 +111,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -139,7 +139,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -155,7 +155,7 @@ public class SimpleResourceManagementStrategyTest provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -196,7 +196,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -214,7 +214,7 @@ public class SimpleResourceManagementStrategyTest provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -248,7 +248,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -278,7 +278,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -293,7 +293,7 @@ public class SimpleResourceManagementStrategyTest terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null)