Merge pull request #1880 from gianm/rtr-adjust

RTR: Ensure that there is only one cleanup task scheduled for a worker at once.
This commit is contained in:
Fangjin Yang 2015-11-18 15:12:55 -08:00
commit e52c156066
1 changed files with 108 additions and 70 deletions

View File

@ -35,6 +35,9 @@ import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.RE; import com.metamx.common.RE;
@ -140,7 +143,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private volatile boolean started = false; private volatile boolean started = false;
private final ScheduledExecutorService cleanupExec; private final ListeningScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
@ -164,7 +167,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath()); this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.httpClient = httpClient; this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef; this.workerConfigRef = workerConfigRef;
this.cleanupExec = cleanupExec; this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
} }
@LifecycleStart @LifecycleStart
@ -235,6 +238,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
removeWorker(worker); removeWorker(worker);
break; break;
case INITIALIZED: case INITIALIZED:
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
List<String> workers;
try {
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
}
catch (KeeperException.NoNodeException e) {
// statusPath doesn't exist yet; can occur if no middleManagers have started.
workers = ImmutableList.of();
}
for (String workerId : workers) {
final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
try {
scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
}
catch (Exception e) {
log.warn(
e,
"Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
workerId,
workerStatusPath
);
}
}
}
synchronized (waitingForMonitor) { synchronized (waitingForMonitor) {
waitingFor.decrement(); waitingFor.decrement();
waitingForMonitor.notifyAll(); waitingForMonitor.notifyAll();
@ -251,26 +280,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
waitingForMonitor.wait(); waitingForMonitor.wait();
} }
} }
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
List<String> workers;
try {
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
}
catch (KeeperException.NoNodeException e) {
// statusPath doesn't exist yet; can occur if no middleManagers have started.
workers = ImmutableList.of();
}
for (String worker : workers) {
if (!zkWorkers.containsKey(worker)
&& cf.checkExists().forPath(JOINER.join(indexerZkConfig.getAnnouncementsPath(), worker)) == null) {
scheduleTasksCleanupForWorker(
worker,
cf.getChildren()
.forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker))
);
}
}
started = true; started = true;
} }
catch (Exception e) { catch (Exception e) {
@ -698,6 +707,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
} }
private boolean cancelWorkerCleanup(String workerHost)
{
ScheduledFuture previousCleanup = removedWorkerCleanups.remove(workerHost);
if (previousCleanup != null) {
log.info("Cancelling Worker[%s] scheduled task cleanup", workerHost);
previousCleanup.cancel(false);
}
return previousCleanup != null;
}
/** /**
* When a new worker appears, listeners are registered for status changes associated with tasks assigned to * When a new worker appears, listeners are registered for status changes associated with tasks assigned to
* the worker. Status changes indicate the creation or completion of a task. * the worker. Status changes indicate the creation or completion of a task.
@ -712,11 +731,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Worker[%s] reportin' for duty!", worker.getHost()); log.info("Worker[%s] reportin' for duty!", worker.getHost());
try { try {
ScheduledFuture previousCleanup = removedWorkerCleanups.remove(worker.getHost()); cancelWorkerCleanup(worker.getHost());
if (previousCleanup != null) {
log.info("Cancelling Worker[%s] scheduled task cleanup", worker.getHost());
previousCleanup.cancel(false);
}
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost()); final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
@ -880,55 +895,79 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
lazyWorkers.remove(worker.getHost()); lazyWorkers.remove(worker.getHost());
} }
/**
* Schedule a task that will, at some point in the future, clean up znodes and issue failures for "tasksToFail"
* if they are being run by "worker".
*/
private void scheduleTasksCleanupForWorker(final String worker, final List<String> tasksToFail) private void scheduleTasksCleanupForWorker(final String worker, final List<String> tasksToFail)
{ {
removedWorkerCleanups.put( // This method is only called from the PathChildrenCache event handler, so this may look like a race,
worker, cleanupExec.schedule( // but is actually not.
new Runnable() cancelWorkerCleanup(worker);
{
@Override
public void run()
{
log.info("Running scheduled cleanup for Worker[%s]", worker);
try {
for (String assignedTask : tasksToFail) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
if (cf.checkExists().forPath(statusPath) != null) { final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
cf.delete().guaranteed().forPath(statusPath); new Runnable()
} {
@Override
log.info("Failing task[%s]", assignedTask); public void run()
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask); {
if (taskRunnerWorkItem != null) { log.info("Running scheduled cleanup for Worker[%s]", worker);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId())); try {
} else { for (String assignedTask : tasksToFail) {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask); String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
} String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
} if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
// worker is gone, remove worker task status announcements path.
String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
if (cf.checkExists().forPath(workerStatusPath) != null) {
cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
}
} }
catch (Exception e) {
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit(); if (cf.checkExists().forPath(statusPath) != null) {
throw Throwables.propagate(e); cf.delete().guaranteed().forPath(statusPath);
} }
finally {
removedWorkerCleanups.remove(worker); log.info("Failing task[%s]", assignedTask);
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
} }
} }
},
config.getTaskCleanupTimeout().toStandardDuration().getMillis(), // worker is gone, remove worker task status announcements path.
TimeUnit.MILLISECONDS String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
) if (cf.checkExists().forPath(workerStatusPath) != null) {
cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
}
}
catch (Exception e) {
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
throw Throwables.propagate(e);
}
}
},
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);
removedWorkerCleanups.put(worker, cleanupTask);
// Remove this entry from removedWorkerCleanups when done, if it's actually the one in there.
Futures.addCallback(
cleanupTask,
new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
removedWorkerCleanups.remove(worker, cleanupTask);
}
@Override
public void onFailure(Throwable t)
{
removedWorkerCleanups.remove(worker, cleanupTask);
}
}
); );
} }
@ -1009,7 +1048,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return assignedTasks; return assignedTasks;
} }
// Used for tests
public List<ZkWorker> getLazyWorkers() public List<ZkWorker> getLazyWorkers()
{ {
return ImmutableList.copyOf(lazyWorkers.values()); return ImmutableList.copyOf(lazyWorkers.values());