RemoteTaskRunner should respect worker version changes (fixes #787).

This commit is contained in:
Gian Merlino 2014-10-12 11:26:46 -07:00
parent 8304365948
commit e1fedbe741
4 changed files with 72 additions and 17 deletions

View File

@ -32,7 +32,7 @@ public class ImmutableZkWorker
{
private final Worker worker;
private final int currCapacityUsed;
private final Set<String> availabilityGroups;
private final ImmutableSet<String> availabilityGroups;
public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups)
{

View File

@ -164,7 +164,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
Worker worker;
final Worker worker;
switch (event.getType()) {
case CHILD_ADDED:
worker = jsonMapper.readValue(
@ -198,6 +198,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
);
break;
case CHILD_UPDATED:
worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
updateWorker(worker);
break;
case CHILD_REMOVED:
worker = jsonMapper.readValue(
event.getData().getData(),
@ -745,6 +753,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
}
/**
* We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without
* dropping themselves and re-announcing.
*/
private void updateWorker(final Worker worker)
{
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
log.info("Worker[%s] updated its announcement from[%s] to[%s].", worker.getHost(), zkWorker.getWorker(), worker);
zkWorker.setWorker(worker);
} else {
log.warn(
"WTF, worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.",
worker.getHost()
);
}
}
/**
* When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by
* the logic in the status listener. We still have to make sure there are no tasks assigned

View File

@ -22,11 +22,11 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
@ -46,15 +46,15 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
private final Function<ChildData, TaskAnnouncement> cacheConverter;
private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
this.worker = worker;
this.worker = new AtomicReference<>(worker);
this.statusCache = statusCache;
this.cacheConverter = new Function<ChildData, TaskAnnouncement>()
{
@ -84,7 +84,7 @@ public class ZkWorker implements Closeable
@JsonProperty("worker")
public Worker getWorker()
{
return worker;
return worker.get();
}
@JsonProperty("runningTasks")
@ -137,30 +137,28 @@ public class ZkWorker implements Closeable
return getRunningTasks().containsKey(taskId);
}
public boolean isAtCapacity()
{
return getCurrCapacityUsed() >= worker.getCapacity();
}
public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
return worker.get().getVersion().compareTo(minVersion) >= 0;
}
public boolean canRunTask(Task task)
public void setWorker(Worker newWorker)
{
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
final Worker oldWorker = worker.get();
Preconditions.checkArgument(newWorker.getHost().equals(oldWorker.getHost()), "Cannot change Worker host");
Preconditions.checkArgument(newWorker.getIp().equals(oldWorker.getIp()), "Cannot change Worker ip");
worker.set(newWorker);
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime.getAndSet(completedTaskTime);
lastCompletedTaskTime.set(completedTaskTime);
}
public ImmutableZkWorker toImmutable()
{
return new ImmutableZkWorker(worker, getCurrCapacityUsed(), getAvailabilityGroups());
return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());
}
@Override

View File

@ -361,6 +361,29 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
@Test
public void testWorkerDisabled() throws Exception
{
doSetup();
final ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
// Disable while task running
disableWorker();
// Continue test
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
// Confirm RTR thinks the worker is disabled.
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
}
private void doSetup() throws Exception
{
makeWorker();
@ -405,6 +428,14 @@ public class RemoteTaskRunnerTest
);
}
private void disableWorker() throws Exception
{
cf.setData().forPath(
announcementsPath,
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
);
}
private boolean taskAnnounced(final String taskId)
{
return pathExists(joiner.join(tasksPath, taskId));