Merge pull request #788 from metamx/rtr-worker-disablement

RemoteTaskRunner should respect worker version changes (fixes #787).
This commit is contained in:
FJ Yang 2014-10-12 14:59:51 -06:00
commit 441d28dea7
4 changed files with 72 additions and 17 deletions

View File

@ -32,7 +32,7 @@ public class ImmutableZkWorker
{ {
private final Worker worker; private final Worker worker;
private final int currCapacityUsed; private final int currCapacityUsed;
private final Set<String> availabilityGroups; private final ImmutableSet<String> availabilityGroups;
public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups) public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups)
{ {

View File

@ -164,7 +164,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override @Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{ {
Worker worker; final Worker worker;
switch (event.getType()) { switch (event.getType()) {
case CHILD_ADDED: case CHILD_ADDED:
worker = jsonMapper.readValue( worker = jsonMapper.readValue(
@ -198,6 +198,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
); );
break; break;
case CHILD_UPDATED:
worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
updateWorker(worker);
break;
case CHILD_REMOVED: case CHILD_REMOVED:
worker = jsonMapper.readValue( worker = jsonMapper.readValue(
event.getData().getData(), event.getData().getData(),
@ -745,6 +753,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
} }
/**
* We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without
* dropping themselves and re-announcing.
*/
private void updateWorker(final Worker worker)
{
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
log.info("Worker[%s] updated its announcement from[%s] to[%s].", worker.getHost(), zkWorker.getWorker(), worker);
zkWorker.setWorker(worker);
} else {
log.warn(
"WTF, worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.",
worker.getHost()
);
}
}
/** /**
* When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by * When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by
* the logic in the status listener. We still have to make sure there are no tasks assigned * the logic in the status listener. We still have to make sure there are no tasks assigned

View File

@ -22,11 +22,11 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
@ -46,15 +46,15 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public class ZkWorker implements Closeable public class ZkWorker implements Closeable
{ {
private final Worker worker;
private final PathChildrenCache statusCache; private final PathChildrenCache statusCache;
private final Function<ChildData, TaskAnnouncement> cacheConverter; private final Function<ChildData, TaskAnnouncement> cacheConverter;
private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime()); private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{ {
this.worker = worker; this.worker = new AtomicReference<>(worker);
this.statusCache = statusCache; this.statusCache = statusCache;
this.cacheConverter = new Function<ChildData, TaskAnnouncement>() this.cacheConverter = new Function<ChildData, TaskAnnouncement>()
{ {
@ -84,7 +84,7 @@ public class ZkWorker implements Closeable
@JsonProperty("worker") @JsonProperty("worker")
public Worker getWorker() public Worker getWorker()
{ {
return worker; return worker.get();
} }
@JsonProperty("runningTasks") @JsonProperty("runningTasks")
@ -137,30 +137,28 @@ public class ZkWorker implements Closeable
return getRunningTasks().containsKey(taskId); return getRunningTasks().containsKey(taskId);
} }
public boolean isAtCapacity()
{
return getCurrCapacityUsed() >= worker.getCapacity();
}
public boolean isValidVersion(String minVersion) public boolean isValidVersion(String minVersion)
{ {
return worker.getVersion().compareTo(minVersion) >= 0; return worker.get().getVersion().compareTo(minVersion) >= 0;
} }
public boolean canRunTask(Task task) public void setWorker(Worker newWorker)
{ {
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() final Worker oldWorker = worker.get();
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); Preconditions.checkArgument(newWorker.getHost().equals(oldWorker.getHost()), "Cannot change Worker host");
Preconditions.checkArgument(newWorker.getIp().equals(oldWorker.getIp()), "Cannot change Worker ip");
worker.set(newWorker);
} }
public void setLastCompletedTaskTime(DateTime completedTaskTime) public void setLastCompletedTaskTime(DateTime completedTaskTime)
{ {
lastCompletedTaskTime.getAndSet(completedTaskTime); lastCompletedTaskTime.set(completedTaskTime);
} }
public ImmutableZkWorker toImmutable() public ImmutableZkWorker toImmutable()
{ {
return new ImmutableZkWorker(worker, getCurrCapacityUsed(), getAvailabilityGroups()); return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());
} }
@Override @Override

View File

@ -361,6 +361,29 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode()); Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
} }
@Test
public void testWorkerDisabled() throws Exception
{
doSetup();
final ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
// Disable while task running
disableWorker();
// Continue test
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
// Confirm RTR thinks the worker is disabled.
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
}
private void doSetup() throws Exception private void doSetup() throws Exception
{ {
makeWorker(); makeWorker();
@ -405,6 +428,14 @@ public class RemoteTaskRunnerTest
); );
} }
private void disableWorker() throws Exception
{
cf.setData().forPath(
announcementsPath,
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
);
}
private boolean taskAnnounced(final String taskId) private boolean taskAnnounced(final String taskId)
{ {
return pathExists(joiner.join(tasksPath, taskId)); return pathExists(joiner.join(tasksPath, taskId));