This commit is contained in:
Fangjin Yang 2013-02-13 16:04:43 -08:00
parent c7b4973700
commit 1963bfe83f
8 changed files with 76 additions and 101 deletions

View File

@ -124,7 +124,7 @@ public class LocalTaskRunner implements TaskRunner
}
@Override
public Collection<WorkerWrapper> getWorkers()
public Collection<ZkWorker> getWorkers()
{
return Lists.newArrayList();
}

View File

@ -89,7 +89,7 @@ public class RemoteTaskRunner implements TaskRunner
private final WorkerSetupManager workerSetupManager;
// all workers that exist in ZK
private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>();
// all tasks that have been assigned to a worker
private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue();
// tasks that have not yet run
@ -170,8 +170,8 @@ public class RemoteTaskRunner implements TaskRunner
return;
}
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
workerWrapper.close();
for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close();
}
}
catch (Exception e) {
@ -183,7 +183,7 @@ public class RemoteTaskRunner implements TaskRunner
}
@Override
public Collection<WorkerWrapper> getWorkers()
public Collection<ZkWorker> getWorkers()
{
return zkWorkers.values();
}
@ -200,11 +200,11 @@ public class RemoteTaskRunner implements TaskRunner
return pendingTasks.values();
}
public WorkerWrapper findWorkerRunningTask(String taskId)
public ZkWorker findWorkerRunningTask(String taskId)
{
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
if (workerWrapper.getRunningTasks().contains(taskId)) {
return workerWrapper;
for (ZkWorker zkWorker : zkWorkers.values()) {
if (zkWorker.getRunningTasks().contains(taskId)) {
return zkWorker;
}
}
return null;
@ -321,7 +321,7 @@ public class RemoteTaskRunner implements TaskRunner
/**
* Ensures no workers are already running a task before assigning the task to a worker.
* It is possible that a worker is running a task that the RTR has no knowledge of. This is possible when the RTR
* It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR
* needs to bootstrap after a restart.
*
* @param taskRunnerWorkItem - the task to assign
@ -329,45 +329,20 @@ public class RemoteTaskRunner implements TaskRunner
private void assignTask(TaskRunnerWorkItem taskRunnerWorkItem)
{
try {
WorkerWrapper workerWrapper = findWorkerRunningTask(taskRunnerWorkItem.getTask().getId());
final String taskId = taskRunnerWorkItem.getTask().getId();
ZkWorker zkWorker = findWorkerRunningTask(taskId);
// If a worker is already running this task, we don't need to announce it
if (workerWrapper != null) {
final Worker worker = workerWrapper.getWorker();
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskRunnerWorkItem.getTask().getId());
runningTasks.put(
taskRunnerWorkItem.getTask().getId(),
pendingTasks.remove(taskRunnerWorkItem.getTask().getId())
);
log.info("Task %s switched from pending to running", taskRunnerWorkItem.getTask().getId());
final ChildData statusData = workerWrapper.getStatusCache()
.getCurrentData(
JOINER.join(
config.getStatusPath(),
worker.getHost(),
taskRunnerWorkItem.getTask().getId()
)
);
final TaskStatus taskStatus = jsonMapper.readValue(
statusData.getData(),
TaskStatus.class
);
if (taskStatus.isComplete()) {
TaskCallback callback = taskRunnerWorkItem.getCallback();
if (callback != null) {
callback.notify(taskStatus);
}
cleanup(worker.getHost(), taskRunnerWorkItem.getTask().getId());
}
if (zkWorker != null) {
final Worker worker = zkWorker.getWorker();
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskId);
runningTasks.put(taskId, pendingTasks.remove(taskId));
log.info("Task %s switched from pending to running", taskId);
} else {
// Nothing running this task, announce it in ZK for a worker to run it
workerWrapper = findWorkerForTask();
if (workerWrapper != null) {
announceTask(workerWrapper.getWorker(), taskRunnerWorkItem);
zkWorker = findWorkerForTask();
if (zkWorker != null) {
announceTask(zkWorker.getWorker(), taskRunnerWorkItem);
}
}
}
@ -391,7 +366,6 @@ public class RemoteTaskRunner implements TaskRunner
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
byte[] rawBytes = jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext));
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
@ -421,7 +395,7 @@ public class RemoteTaskRunner implements TaskRunner
/**
* When a new worker appears, listeners are registered for status changes associated with tasks assigned to
* the worker. Status changes indicate the creation or completion of task.
* the worker. Status changes indicate the creation or completion of a task.
* The RemoteTaskRunner updates state according to these changes.
*
* @param worker - contains metadata for a worker that has appeared in ZK
@ -431,7 +405,7 @@ public class RemoteTaskRunner implements TaskRunner
try {
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final WorkerWrapper workerWrapper = new WorkerWrapper(
final ZkWorker zkWorker = new ZkWorker(
worker,
statusCache,
jsonMapper
@ -503,7 +477,7 @@ public class RemoteTaskRunner implements TaskRunner
}
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
zkWorker.setLastCompletedTaskTime(new DateTime());
cleanup(worker.getHost(), taskId);
runPendingTasks();
}
@ -524,7 +498,7 @@ public class RemoteTaskRunner implements TaskRunner
}
}
);
zkWorkers.put(worker.getHost(), workerWrapper);
zkWorkers.put(worker.getHost(), zkWorker);
statusCache.start();
runPendingTasks();
@ -536,15 +510,15 @@ public class RemoteTaskRunner implements TaskRunner
/**
* When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by
* the logic in the status listener. We still have to make sure there are no tasks still assigned
* to the worker.
* the logic in the status listener. We still have to make sure there are no tasks assigned
* to the worker but not yet running.
*
* @param worker - the removed worker
*/
private void removeWorker(final Worker worker)
{
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) {
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
Set<String> tasksPending = Sets.newHashSet(
cf.getChildren()
@ -558,11 +532,11 @@ public class RemoteTaskRunner implements TaskRunner
cf.delete().guaranteed().forPath(JOINER.join(config.getTaskPath(), worker.getHost(), taskId));
retryTask(taskRunnerWorkItem, worker.getHost());
} else {
log.info("RemoteTaskRunner has no knowledge of pending task %s", taskId);
log.warn("RemoteTaskRunner has no knowledge of pending task %s", taskId);
}
}
workerWrapper.getStatusCache().close();
zkWorker.getStatusCache().close();
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -573,24 +547,24 @@ public class RemoteTaskRunner implements TaskRunner
}
}
private WorkerWrapper findWorkerForTask()
private ZkWorker findWorkerForTask()
{
try {
final MinMaxPriorityQueue<WorkerWrapper> workerQueue = MinMaxPriorityQueue.<WorkerWrapper>orderedBy(
new Comparator<WorkerWrapper>()
final MinMaxPriorityQueue<ZkWorker> workerQueue = MinMaxPriorityQueue.<ZkWorker>orderedBy(
new Comparator<ZkWorker>()
{
@Override
public int compare(WorkerWrapper w1, WorkerWrapper w2)
public int compare(ZkWorker w1, ZkWorker w2)
{
return -Ints.compare(w1.getRunningTasks().size(), w2.getRunningTasks().size());
}
}
).create(
FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<WorkerWrapper>()
new Predicate<ZkWorker>()
{
@Override
public boolean apply(WorkerWrapper input)
public boolean apply(ZkWorker input)
{
return (!input.isAtCapacity() &&
input.getWorker()

View File

@ -44,5 +44,5 @@ public interface TaskRunner
public Collection<TaskRunnerWorkItem> getPendingTasks();
public Collection<WorkerWrapper> getWorkers();
public Collection<ZkWorker> getWorkers();
}

View File

@ -36,8 +36,9 @@ import java.io.IOException;
import java.util.Set;
/**
* Holds information about a worker and a listener for task status changes associated with the worker.
*/
public class WorkerWrapper implements Closeable
public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
@ -45,7 +46,7 @@ public class WorkerWrapper implements Closeable
private volatile DateTime lastCompletedTaskTime = new DateTime();
public WorkerWrapper(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
this.worker = worker;
this.statusCache = statusCache;
@ -108,7 +109,7 @@ public class WorkerWrapper implements Closeable
@Override
public String toString()
{
return "WorkerWrapper{" +
return "ZkWorker{" +
"worker=" + worker +
", statusCache=" + statusCache +
", cacheConverter=" + cacheConverter +

View File

@ -20,7 +20,7 @@
package com.metamx.druid.merger.coordinator.scaling;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.ZkWorker;
import java.util.Collection;
@ -30,9 +30,9 @@ import java.util.Collection;
*/
public interface ResourceManagementStrategy
{
public boolean doProvision(Collection<TaskRunnerWorkItem> runningTasks, Collection<WorkerWrapper> workerWrappers);
public boolean doProvision(Collection<TaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public boolean doTerminate(Collection<TaskRunnerWorkItem> runningTasks, Collection<WorkerWrapper> workerWrappers);
public boolean doTerminate(Collection<TaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public ScalingStats getStats();
}

View File

@ -25,7 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.ZkWorker;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
@ -65,16 +65,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doProvision(Collection<TaskRunnerWorkItem> pendingTasks, Collection<WorkerWrapper> workerWrappers)
public boolean doProvision(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workerWrappers,
new Function<WorkerWrapper, String>()
zkWorkers,
new Function<ZkWorker, String>()
{
@Override
public String apply(WorkerWrapper input)
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
@ -120,16 +120,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<WorkerWrapper> workerWrappers)
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workerWrappers,
new Function<WorkerWrapper, String>()
zkWorkers,
new Function<ZkWorker, String>()
{
@Override
public String apply(WorkerWrapper input)
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
@ -146,18 +146,18 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
if (nothingTerminating) {
final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
if (workerWrappers.size() <= minNumWorkers) {
if (zkWorkers.size() <= minNumWorkers) {
return false;
}
List<WorkerWrapper> thoseLazyWorkers = Lists.newArrayList(
List<ZkWorker> thoseLazyWorkers = Lists.newArrayList(
FunctionalIterable
.create(workerWrappers)
.create(zkWorkers)
.filter(
new Predicate<WorkerWrapper>()
new Predicate<ZkWorker>()
{
@Override
public boolean apply(WorkerWrapper input)
public boolean apply(ZkWorker input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
@ -174,10 +174,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
AutoScalingData terminated = autoScalingStrategy.terminate(
Lists.transform(
thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1),
new Function<WorkerWrapper, String>()
new Function<ZkWorker, String>()
{
@Override
public String apply(WorkerWrapper input)
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}

View File

@ -247,7 +247,7 @@ public class RemoteTaskRunnerTest
while (remoteTaskRunner.findWorkerRunningTask(task1.getId()) != null) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Task still not announced in ZK?");
throw new ISE("WTF?! Task still exists in ZK?");
}
count++;
}

View File

@ -27,7 +27,7 @@ import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.ZkWorker;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.Worker;
@ -123,8 +123,8 @@ public class SimpleResourceManagementStrategyTest
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(testTask)
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
@ -151,8 +151,8 @@ public class SimpleResourceManagementStrategyTest
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(testTask)
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
@ -167,8 +167,8 @@ public class SimpleResourceManagementStrategyTest
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(testTask)
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
@ -201,8 +201,8 @@ public class SimpleResourceManagementStrategyTest
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(null)
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
@ -233,8 +233,8 @@ public class SimpleResourceManagementStrategyTest
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(null)
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
@ -248,8 +248,8 @@ public class SimpleResourceManagementStrategyTest
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(null)
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
@ -263,11 +263,11 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
}
private static class TestWorkerWrapper extends WorkerWrapper
private static class TestZkWorker extends ZkWorker
{
private final Task testTask;
private TestWorkerWrapper(
private TestZkWorker(
Task testTask
)
{