diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 008729d8969..8c32c22a5e6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -35,6 +35,9 @@ import com.google.common.io.ByteSource; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.ISE; import com.metamx.common.RE; @@ -140,7 +143,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private volatile boolean started = false; - private final ScheduledExecutorService cleanupExec; + private final ListeningScheduledExecutorService cleanupExec; private final ConcurrentMap removedWorkerCleanups = new ConcurrentHashMap<>(); @@ -164,7 +167,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath()); this.httpClient = httpClient; this.workerConfigRef = workerConfigRef; - this.cleanupExec = cleanupExec; + this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec); } @LifecycleStart @@ -235,6 +238,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer removeWorker(worker); break; case INITIALIZED: + // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running + List workers; + try { + workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath()); + } + catch (KeeperException.NoNodeException e) { + // statusPath doesn't exist yet; can occur if no middleManagers have started. + workers = ImmutableList.of(); + } + for (String workerId : workers) { + final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId); + final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId); + if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) { + try { + scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath)); + } + catch (Exception e) { + log.warn( + e, + "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.", + workerId, + workerStatusPath + ); + } + } + } synchronized (waitingForMonitor) { waitingFor.decrement(); waitingForMonitor.notifyAll(); @@ -251,26 +280,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer waitingForMonitor.wait(); } } - // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running - List workers; - try { - workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath()); - } - catch (KeeperException.NoNodeException e) { - // statusPath doesn't exist yet; can occur if no middleManagers have started. - workers = ImmutableList.of(); - } - for (String worker : workers) { - if (!zkWorkers.containsKey(worker) - && cf.checkExists().forPath(JOINER.join(indexerZkConfig.getAnnouncementsPath(), worker)) == null) { - scheduleTasksCleanupForWorker( - worker, - cf.getChildren() - .forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker)) - ); - } - } - started = true; } catch (Exception e) { @@ -698,6 +707,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } } + private boolean cancelWorkerCleanup(String workerHost) + { + ScheduledFuture previousCleanup = removedWorkerCleanups.remove(workerHost); + if (previousCleanup != null) { + log.info("Cancelling Worker[%s] scheduled task cleanup", workerHost); + previousCleanup.cancel(false); + } + return previousCleanup != null; + } + /** * When a new worker appears, listeners are registered for status changes associated with tasks assigned to * the worker. Status changes indicate the creation or completion of a task. @@ -712,11 +731,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer log.info("Worker[%s] reportin' for duty!", worker.getHost()); try { - ScheduledFuture previousCleanup = removedWorkerCleanups.remove(worker.getHost()); - if (previousCleanup != null) { - log.info("Cancelling Worker[%s] scheduled task cleanup", worker.getHost()); - previousCleanup.cancel(false); - } + cancelWorkerCleanup(worker.getHost()); final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost()); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); @@ -880,55 +895,79 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer lazyWorkers.remove(worker.getHost()); } + /** + * Schedule a task that will, at some point in the future, clean up znodes and issue failures for "tasksToFail" + * if they are being run by "worker". + */ private void scheduleTasksCleanupForWorker(final String worker, final List tasksToFail) { - removedWorkerCleanups.put( - worker, cleanupExec.schedule( - new Runnable() - { - @Override - public void run() - { - log.info("Running scheduled cleanup for Worker[%s]", worker); - try { - for (String assignedTask : tasksToFail) { - String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask); - String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask); - if (cf.checkExists().forPath(taskPath) != null) { - cf.delete().guaranteed().forPath(taskPath); - } + // This method is only called from the PathChildrenCache event handler, so this may look like a race, + // but is actually not. + cancelWorkerCleanup(worker); - if (cf.checkExists().forPath(statusPath) != null) { - cf.delete().guaranteed().forPath(statusPath); - } - - log.info("Failing task[%s]", assignedTask); - RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask); - if (taskRunnerWorkItem != null) { - taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId())); - } else { - log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask); - } - } - - // worker is gone, remove worker task status announcements path. - String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker); - if (cf.checkExists().forPath(workerStatusPath) != null) { - cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker)); - } + final ListenableScheduledFuture cleanupTask = cleanupExec.schedule( + new Runnable() + { + @Override + public void run() + { + log.info("Running scheduled cleanup for Worker[%s]", worker); + try { + for (String assignedTask : tasksToFail) { + String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask); + String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask); + if (cf.checkExists().forPath(taskPath) != null) { + cf.delete().guaranteed().forPath(taskPath); } - catch (Exception e) { - log.makeAlert("Exception while cleaning up worker[%s]", worker).emit(); - throw Throwables.propagate(e); + + if (cf.checkExists().forPath(statusPath) != null) { + cf.delete().guaranteed().forPath(statusPath); } - finally { - removedWorkerCleanups.remove(worker); + + log.info("Failing task[%s]", assignedTask); + RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask); + if (taskRunnerWorkItem != null) { + taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId())); + } else { + log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask); } } - }, - config.getTaskCleanupTimeout().toStandardDuration().getMillis(), - TimeUnit.MILLISECONDS - ) + + // worker is gone, remove worker task status announcements path. + String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker); + if (cf.checkExists().forPath(workerStatusPath) != null) { + cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker)); + } + } + catch (Exception e) { + log.makeAlert("Exception while cleaning up worker[%s]", worker).emit(); + throw Throwables.propagate(e); + } + } + }, + config.getTaskCleanupTimeout().toStandardDuration().getMillis(), + TimeUnit.MILLISECONDS + ); + + removedWorkerCleanups.put(worker, cleanupTask); + + // Remove this entry from removedWorkerCleanups when done, if it's actually the one in there. + Futures.addCallback( + cleanupTask, + new FutureCallback() + { + @Override + public void onSuccess(Object result) + { + removedWorkerCleanups.remove(worker, cleanupTask); + } + + @Override + public void onFailure(Throwable t) + { + removedWorkerCleanups.remove(worker, cleanupTask); + } + } ); } @@ -1009,7 +1048,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return assignedTasks; } - // Used for tests public List getLazyWorkers() { return ImmutableList.copyOf(lazyWorkers.values());