1) remove retry logic from RTR

2) simplify configs
3) introduce task resource
4) make worker versions match coordinator version by default
This commit is contained in:
fjy 2013-07-12 12:51:12 -07:00
parent a190527731
commit b9578a1ada
26 changed files with 413 additions and 342 deletions

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
*/
public class DruidServer implements Comparable
{
public static final String DEFAULT_TIER = "_default_tier";
private static final Logger log = new Logger(DruidServer.class);
private final Object lock = new Object();

View File

@ -37,6 +37,6 @@ public abstract class DruidServerConfig
public abstract long getMaxSize();
@Config("druid.server.tier")
@Default("_default_tier")
@Default(DruidServer.DEFAULT_TIER)
public abstract String getTier();
}

View File

@ -43,7 +43,7 @@ public abstract class AbstractTask implements Task
private final String groupId;
@JsonIgnore
private final String availabilityGroup;
private final TaskResource taskResource;
@JsonIgnore
private final String dataSource;
@ -53,23 +53,23 @@ public abstract class AbstractTask implements Task
protected AbstractTask(String id, String dataSource, Interval interval)
{
this(id, id, id, dataSource, interval);
this(id, id, new TaskResource(id, 1), dataSource, interval);
}
protected AbstractTask(String id, String groupId, String dataSource, Interval interval)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.availabilityGroup = id;
this.taskResource = new TaskResource(id, 1);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval);
}
protected AbstractTask(String id, String groupId, String availabilityGroup, String dataSource, Interval interval)
protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.availabilityGroup = Preconditions.checkNotNull(availabilityGroup, "availabilityGroup");
this.taskResource = Preconditions.checkNotNull(taskResource, "taskResource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval);
}
@ -90,9 +90,9 @@ public abstract class AbstractTask implements Task
@JsonProperty
@Override
public String getAvailabilityGroup()
public TaskResource getTaskResource()
{
return availabilityGroup;
return taskResource;
}
@Override
@ -172,19 +172,16 @@ public abstract class AbstractTask implements Task
AbstractTask that = (AbstractTask) o;
if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) {
return false;
}
if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) {
return false;
}
if (id != null ? !id.equals(that.id) : that.id != null) {
return false;
}
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
if (!id.equals(that.id)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return id.hashCode();
}
}

View File

@ -88,7 +88,6 @@ public class IndexDeterminePartitionsTask extends AbstractTask
super(
id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()),
groupId,
makeTaskId(groupId, interval.getStart(), interval.getEnd()),
schema.getDataSource(),
Preconditions.checkNotNull(interval, "interval")
);

View File

@ -105,7 +105,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonCreator
public RealtimeIndexTask(
@JsonProperty("id") String id,
@JsonProperty("availabilityGroup") String availabilityGroup,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("schema") Schema schema,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@ -121,9 +121,15 @@ public class RealtimeIndexTask extends AbstractTask
"index_realtime_%s",
schema.getDataSource()
),
availabilityGroup != null
? availabilityGroup
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()),
taskResource != null
? taskResource
: new TaskResource(
makeTaskId(
schema.getDataSource(),
schema.getShardSpec().getPartitionNum(),
new DateTime().toString()
), 1
),
schema.getDataSource(),
null
);

View File

@ -71,12 +71,7 @@ public interface Task
*/
public String getGroupId();
/**
* Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same
* worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the
* task ID.
*/
public String getAvailabilityGroup();
public TaskResource getTaskResource();
/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.

View File

@ -0,0 +1,39 @@
package com.metamx.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class TaskResource
{
private final String availabilityGroup;
private final int capacity;
@JsonCreator
public TaskResource(
@JsonProperty("availabilityGroup") String availabilityGroup,
@JsonProperty("capacity") int capacity
)
{
this.availabilityGroup = availabilityGroup;
this.capacity = capacity;
}
/**
* Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same
* worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the
* task ID.
*/
@JsonProperty
public String getAvailabilityGroup()
{
return availabilityGroup;
}
@JsonProperty
public int getCapacity()
{
return capacity;
}
}

View File

@ -104,7 +104,7 @@ public class VersionConverterTask extends AbstractTask
DataSegment segment
)
{
super(id, groupId, id, dataSource, interval);
super(id, groupId, dataSource, interval);
this.segment = segment;
}
@ -205,13 +205,6 @@ public class VersionConverterTask extends AbstractTask
segment.getShardSpec().getPartitionNum()
),
groupId,
joinId(
groupId,
"sub",
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getShardSpec().getPartitionNum()
),
segment.getDataSource(),
segment.getInterval()
);

View File

@ -94,6 +94,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
this.jsonMapper = jsonMapper;
}
@Override
public void bootstrap(List<Task> tasks)
{
// do nothing
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{

View File

@ -24,22 +24,17 @@ import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.indexing.common.RetryPolicy;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
@ -49,6 +44,8 @@ import com.metamx.druid.indexing.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import com.metamx.http.client.response.ToStringResponseHandler;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@ -56,25 +53,22 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -96,15 +90,13 @@ import java.util.concurrent.atomic.AtomicReference;
public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final ToStringResponseHandler STRING_RESPONSE_HANDLER = new ToStringResponseHandler(Charsets.UTF_8);
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
private static final Joiner JOINER = Joiner.on("/");
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework cf;
private final PathChildrenCache workerPathCache;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
private final AtomicReference<WorkerSetupData> workerSetupData;
private final HttpClient httpClient;
@ -114,8 +106,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue();
// tasks that have not yet run
private final TaskRunnerWorkQueue pendingTasks = new TaskRunnerWorkQueue();
// idempotent task retry
private final Set<String> tasksToRetry = new ConcurrentSkipListSet<String>();
private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
@ -128,8 +118,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
RemoteTaskRunnerConfig config,
CuratorFramework cf,
PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory,
AtomicReference<WorkerSetupData> workerSetupData,
HttpClient httpClient
)
@ -138,8 +126,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
this.config = config;
this.cf = cf;
this.workerPathCache = workerPathCache;
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
this.workerSetupData = workerSetupData;
this.httpClient = httpClient;
}
@ -147,43 +133,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
@LifecycleStart
public void start()
{
try {
if (started) {
return;
}
// Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Worker[%s] reportin' for duty!", worker.getHost());
addWorker(worker);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
removeWorker(worker);
}
}
}
);
workerPathCache.start();
started = true;
}
catch (Exception e) {
throw Throwables.propagate(e);
if (started) {
return;
}
started = true;
}
@LifecycleStop
@ -197,6 +151,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close();
}
workerPathCache.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -227,18 +182,71 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
public ZkWorker findWorkerRunningTask(String taskId)
{
for (ZkWorker zkWorker : zkWorkers.values()) {
if (zkWorker.getRunningTasks().contains(taskId)) {
if (zkWorker.isRunningTask(taskId)) {
return zkWorker;
}
}
return null;
}
public boolean isWorkerRunningTask(String workerHost, String taskId)
public boolean isWorkerRunningTask(Worker worker, Task task)
{
ZkWorker zkWorker = zkWorkers.get(workerHost);
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
return (zkWorker != null && zkWorker.getRunningTasks().contains(taskId));
return (zkWorker != null && zkWorker.isRunningTask(task.getId()));
}
@Override
public void bootstrap(List<Task> tasks)
{
try {
Map<String, ZkWorker> existingTasks = Maps.newHashMap();
Set<String> existingWorkers = Sets.newHashSet(cf.getChildren().forPath(config.getIndexerAnnouncementPath()));
for (String existingWorker : existingWorkers) {
Worker worker = jsonMapper.readValue(
cf.getData()
.forPath(
JOINER.join(
config.getIndexerAnnouncementPath(),
existingWorker
)
), Worker.class
);
ZkWorker zkWorker = addWorker(worker);
List<String> runningTasks = cf.getChildren()
.forPath(JOINER.join(config.getIndexerStatusPath(), existingWorker));
for (String runningTask : runningTasks) {
existingTasks.put(runningTask, zkWorker);
}
}
// initialize data structures
for (Task task : tasks) {
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(
task,
SettableFuture.<TaskStatus>create(),
new DateTime()
);
ZkWorker zkWorker = existingTasks.remove(task.getId());
if (zkWorker != null) {
runningTasks.put(task.getId(), taskRunnerWorkItem);
zkWorker.addRunningTask(task);
} else {
addPendingTask(taskRunnerWorkItem);
}
}
// shutdown any tasks that we don't know about
for (String existingTask : existingTasks.keySet()) {
shutdown(existingTask);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
init();
}
/**
@ -253,7 +261,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId());
}
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(
task, SettableFuture.<TaskStatus>create(), retryPolicyFactory.makeRetryPolicy(), new DateTime()
task, SettableFuture.<TaskStatus>create(), new DateTime()
);
addPendingTask(taskRunnerWorkItem);
return taskRunnerWorkItem.getResult();
@ -262,7 +270,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
/**
* Finds the worker running the task and forwards the shutdown signal to the worker.
*
* @param taskId
* @param taskId - task id to shutdown
*/
@Override
public void shutdown(String taskId)
@ -275,40 +283,30 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
if (zkWorker == null) {
// Would be nice to have an ability to shut down pending tasks
log.info("Can't shutdown! No worker running task %s", taskId);
return;
}
final RetryPolicy shutdownRetryPolicy = retryPolicyFactory.makeRetryPolicy();
final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
try {
final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
final StatusResponseHolder response = httpClient.post(url)
.go(RESPONSE_HANDLER)
.get();
while (!shutdownRetryPolicy.hasExceededRetryThreshold()) {
try {
final String response = httpClient.post(url)
.go(STRING_RESPONSE_HANDLER)
.get();
log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response);
log.info(
"Sent shutdown message to worker: %s, status %s, response: %s",
zkWorker.getWorker().getHost(),
response.getStatus(),
response.getContent()
);
return;
}
catch (Exception e) {
log.error(e, "Exception shutting down taskId: %s", taskId);
if (shutdownRetryPolicy.hasExceededRetryThreshold()) {
throw Throwables.propagate(e);
} else {
try {
final long sleepTime = shutdownRetryPolicy.getAndIncrementRetryDelay().getMillis();
log.info("Will try again in %s.", new Duration(sleepTime).toString());
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
throw Throwables.propagate(e2);
}
}
if (!response.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
throw new ISE("Shutdown failed");
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
@ -321,7 +319,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
return Optional.absent();
} else {
// Worker is still running this task
final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset));
final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset));
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@ -347,7 +345,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
}
}
private URL workerURL(Worker worker, String path)
private URL makeWorkerURL(Worker worker, String path)
{
Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path);
@ -403,42 +401,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
);
}
/**
* Retries a task by inserting it back into the pending queue after a given delay.
*
* @param taskRunnerWorkItem - the task to retry
*/
private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem)
{
final String taskId = taskRunnerWorkItem.getTask().getId();
if (tasksToRetry.contains(taskId)) {
return;
}
tasksToRetry.add(taskId);
if (!taskRunnerWorkItem.getRetryPolicy().hasExceededRetryThreshold()) {
log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId);
scheduledExec.schedule(
new Runnable()
{
@Override
public void run()
{
runningTasks.remove(taskId);
tasksToRetry.remove(taskId);
addPendingTask(taskRunnerWorkItem);
}
},
taskRunnerWorkItem.getRetryPolicy().getAndIncrementRetryDelay().getMillis(),
TimeUnit.MILLISECONDS
);
} else {
log.makeAlert("Task exceeded retry threshold").addData("task", taskId).emit();
}
}
/**
* Removes a task from the running queue and clears out the ZK status path of the task.
*
@ -468,17 +430,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{
try {
final String taskId = taskRunnerWorkItem.getTask().getId();
ZkWorker zkWorker = findWorkerRunningTask(taskId);
// If a worker is already running this task, we don't need to announce it
if (zkWorker != null) {
final Worker worker = zkWorker.getWorker();
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskId);
runningTasks.put(taskId, pendingTasks.remove(taskId));
log.info("Task %s switched from pending to running", taskId);
if (runningTasks.containsKey(taskId)) {
log.info("Task[%s] already running.", taskId);
} else {
// Nothing running this task, announce it in ZK for a worker to run it
zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask());
ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask());
if (zkWorker != null) {
announceTask(zkWorker.getWorker(), taskRunnerWorkItem);
}
@ -525,7 +482,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
Stopwatch timeoutStopwatch = new Stopwatch();
timeoutStopwatch.start();
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) {
while (!isWorkerRunningTask(theWorker, task)) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeoutDuration().getMillis()) {
log.error(
@ -534,13 +491,52 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
task.getId(),
config.getTaskAssignmentTimeoutDuration()
);
retryTask(runningTasks.get(task.getId()));
failTask(taskRunnerWorkItem);
break;
}
}
}
}
private void init()
{
try {
// Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Worker[%s] reportin' for duty!", worker.getHost());
ZkWorker zkWorker = addWorker(worker);
initWorker(zkWorker);
runPendingTasks();
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
removeWorker(worker);
}
}
}
);
workerPathCache.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* When a new worker appears, listeners are registered for status changes associated with tasks assigned to
* the worker. Status changes indicate the creation or completion of a task.
@ -548,19 +544,30 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
*
* @param worker - contains metadata for a worker that has appeared in ZK
*/
private void addWorker(final Worker worker)
private ZkWorker addWorker(final Worker worker)
{
try {
final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final ZkWorker zkWorker = new ZkWorker(
worker,
statusCache,
jsonMapper
statusCache
);
zkWorkers.put(worker.getHost(), zkWorker);
return zkWorker;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private void initWorker(final ZkWorker zkWorker)
{
try {
// Add status listener to the watcher for status changes
statusCache.getListenable().addListener(
zkWorker.addListener(
new PathChildrenCacheListener()
{
@Override
@ -575,20 +582,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
event.getData().getData(), TaskStatus.class
);
// This can fail if a worker writes a bogus status. Retry if so.
if (!taskStatus.getId().equals(taskId)) {
retryTask(runningTasks.get(taskId));
return;
}
log.info(
"Worker[%s] wrote %s status for task: %s",
worker.getHost(),
zkWorker.getWorker().getHost(),
taskStatus.getStatusCode(),
taskId
);
// Synchronizing state with ZK
statusLock.notify();
@ -596,9 +596,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (taskRunnerWorkItem == null) {
log.warn(
"WTF?! Worker[%s] announcing a status for a task I didn't know about: %s",
worker.getHost(),
zkWorker.getWorker().getHost(),
taskId
);
} else {
zkWorker.addRunningTask(taskRunnerWorkItem.getTask());
}
if (taskStatus.isComplete()) {
@ -607,11 +609,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
zkWorker.removeRunningTask(taskRunnerWorkItem.getTask());
}
// Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime());
cleanup(worker.getHost(), taskId);
cleanup(zkWorker.getWorker().getHost(), taskId);
runPendingTasks();
}
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
@ -619,13 +623,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
retryTask(taskRunnerWorkItem);
failTask(taskRunnerWorkItem);
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", worker.getHost())
.addData("worker", zkWorker.getWorker().getHost())
.addData("znode", event.getData().getPath())
.emit();
}
@ -633,10 +637,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
}
}
);
zkWorkers.put(worker.getHost(), zkWorker);
statusCache.start();
runPendingTasks();
zkWorker.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -672,7 +674,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
retryTask(taskRunnerWorkItem);
failTask(taskRunnerWorkItem);
} else {
log.warn("RemoteTaskRunner has no knowledge of task %s", taskId);
}
@ -691,48 +693,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private ZkWorker findWorkerForTask(final Task task)
{
try {
final MinMaxPriorityQueue<ZkWorker> workerQueue = MinMaxPriorityQueue.<ZkWorker>orderedBy(
new Comparator<ZkWorker>()
{
@Override
public int compare(ZkWorker w1, ZkWorker w2)
{
return -Ints.compare(w1.getRunningTasks().size(), w2.getRunningTasks().size());
}
}
).create(
FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
for (String taskId : input.getRunningTasks()) {
TaskRunnerWorkItem workerTask = runningTasks.get(taskId);
if (workerTask != null && task.getAvailabilityGroup()
.equalsIgnoreCase(workerTask.getTask().getAvailabilityGroup())) {
return false;
}
}
return (!input.isAtCapacity() &&
input.getWorker()
.getVersion()
.compareTo(workerSetupData.get().getMinVersion()) >= 0);
}
}
)
);
if (workerQueue.isEmpty()) {
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null;
for (ZkWorker zkWorker : zkWorkers.values()) {
if (zkWorker.canRunTask(task) &&
zkWorker.getWorker().getVersion().compareTo(workerSetupData.get().getMinVersion()) >= 0) {
return zkWorker;
}
}
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null;
}
return workerQueue.peek();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
private void failTask(TaskRunnerWorkItem taskRunnerWorkItem)
{
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
((SettableFuture<TaskStatus>) result).set(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
}
}

View File

@ -24,6 +24,7 @@ import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import java.util.Collection;
import java.util.List;
/**
* Interface for handing off tasks. Used by a {@link com.metamx.druid.indexing.coordinator.exec.TaskConsumer} to
@ -31,6 +32,8 @@ import java.util.Collection;
*/
public interface TaskRunner
{
public void bootstrap(List<Task> tasks);
/**
* Run a task. The returned status should be some kind of completed status.
*

View File

@ -35,7 +35,6 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{
private final Task task;
private final ListenableFuture<TaskStatus> result;
private final RetryPolicy retryPolicy;
private final DateTime createdTime;
private volatile DateTime queueInsertionTime;
@ -43,13 +42,11 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> result,
RetryPolicy retryPolicy,
DateTime createdTime
)
{
this.task = task;
this.result = result;
this.retryPolicy = retryPolicy;
this.createdTime = createdTime;
}
@ -64,11 +61,6 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return result;
}
public RetryPolicy getRetryPolicy()
{
return retryPolicy;
}
@JsonProperty
public DateTime getCreatedTime()
{
@ -102,7 +94,6 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return "TaskRunnerWorkItem{" +
"task=" + task +
", result=" + result +
", retryPolicy=" + retryPolicy +
", createdTime=" + createdTime +
'}';
}

View File

@ -77,13 +77,19 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
exec.shutdownNow();
}
@Override
public void bootstrap(List<Task> tasks)
{
// do nothing
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, null, new DateTime());
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, new DateTime());
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>()
@ -246,7 +252,6 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return new TaskRunnerWorkItem(
task,
null,
null,
createdTime
);
}

View File

@ -24,17 +24,22 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
@ -44,27 +49,28 @@ public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
private final Function<ChildData, String> cacheConverter;
private final Object lock = new Object();
private final Map<String, Task> runningTasks = Maps.newHashMap();
private final Set<String> availabilityGroups = Sets.newHashSet();
private volatile int currCapacity = 0;
private volatile DateTime lastCompletedTaskTime = new DateTime();
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
public ZkWorker(Worker worker, PathChildrenCache statusCache)
{
this.worker = worker;
this.statusCache = statusCache;
this.cacheConverter = new Function<ChildData, String>()
{
@Override
public String apply(@Nullable ChildData input)
{
try {
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
}
public void start() throws Exception
{
statusCache.start();
}
public void addListener(PathChildrenCacheListener listener)
{
statusCache.getListenable().addListener(listener);
}
@JsonProperty
@ -74,14 +80,15 @@ public class ZkWorker implements Closeable
}
@JsonProperty
public Set<String> getRunningTasks()
public Collection<Task> getRunningTasks()
{
return Sets.newHashSet(
Lists.transform(
statusCache.getCurrentData(),
cacheConverter
)
);
return runningTasks.values();
}
@JsonProperty
public int getCurrCapacity()
{
return currCapacity;
}
@JsonProperty
@ -90,12 +97,47 @@ public class ZkWorker implements Closeable
return lastCompletedTaskTime;
}
@JsonProperty
public void addRunningTask(Task task)
{
synchronized (lock) {
runningTasks.put(task.getId(), task);
availabilityGroups.add(task.getTaskResource().getAvailabilityGroup());
currCapacity += task.getTaskResource().getCapacity();
}
}
public void addRunningTasks(Collection<Task> tasks)
{
for (Task task : tasks) {
addRunningTask(task);
}
}
public Task removeRunningTask(Task task)
{
synchronized (lock) {
currCapacity -= task.getTaskResource().getCapacity();
availabilityGroups.remove(task.getTaskResource().getAvailabilityGroup());
return runningTasks.remove(task.getId());
}
}
public boolean isRunningTask(String taskId)
{
return runningTasks.containsKey(taskId);
}
public boolean isAtCapacity()
{
return statusCache.getCurrentData().size() >= worker.getCapacity();
return currCapacity >= worker.getCapacity();
}
public boolean canRunTask(Task task)
{
return (worker.getCapacity() - currCapacity >= task.getTaskResource().getCapacity() && !availabilityGroups.contains(task.getTaskResource().getAvailabilityGroup()));
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime = completedTaskTime;
@ -111,8 +153,10 @@ public class ZkWorker implements Closeable
public String toString()
{
return "ZkWorker{" +
"worker=" + worker +
"runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
", currCapacity=" + currCapacity +
", worker=" + worker +
'}';
}
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.indexing.coordinator.config;
import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull;
/**
*/
@ -29,4 +30,8 @@ public abstract class EC2AutoScalingStrategyConfig
@Config("druid.indexer.worker.port")
@Default("8080")
public abstract String getWorkerPort();
@Config("druid.indexer.worker.version")
@DefaultNull
public abstract String getWorkerVersion();
}

View File

@ -637,29 +637,12 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
@Override
public TaskRunner build()
{
// Don't use scheduledExecutorFactory, since it's linked to the wrong lifecycle (global lifecycle instead
// of leadership lifecycle)
final ScheduledExecutorService retryScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("RemoteRunnerRetryExec--%d")
.build()
);
final CuratorFramework curator = getCuratorFramework();
return new RemoteTaskRunner(
getJsonMapper(),
getConfigFactory().build(RemoteTaskRunnerConfig.class),
curator,
new PathChildrenCache(curator, indexerZkConfig.getIndexerAnnouncementPath(), true),
retryScheduledExec,
new RetryPolicyFactory(
getConfigFactory().buildWithReplacements(
RetryPolicyConfig.class,
ImmutableMap.of("base_path", "druid.indexing")
)
),
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class),
httpClient
);

View File

@ -33,6 +33,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.indexing.coordinator.setup.EC2NodeData;
import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.codec.binary.Base64;
@ -72,6 +73,11 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
WorkerSetupData setupData = workerSetupDataRef.get();
EC2NodeData workerConfig = setupData.getNodeData();
GalaxyUserData userData = setupData.getUserData();
if (config.getWorkerVersion() != null) {
userData = userData.withVersion(config.getWorkerVersion());
}
RunInstancesResult result = amazonEC2Client.runInstances(
new RunInstancesRequest(
workerConfig.getAmiId(),
@ -84,7 +90,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
.withUserData(
Base64.encodeBase64String(
jsonMapper.writeValueAsBytes(
setupData.getUserData()
userData
)
)
)

View File

@ -60,6 +60,11 @@ public class GalaxyUserData
return type;
}
public GalaxyUserData withVersion(String ver)
{
return new GalaxyUserData(env, ver, type);
}
@Override
public String toString()
{

View File

@ -30,6 +30,7 @@ import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.LockReleaseAction;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.indexing.common.task.AbstractTask;
import com.metamx.druid.indexing.common.task.TaskResource;
import org.joda.time.Interval;
import org.junit.Assert;
@ -41,12 +42,12 @@ public class RealtimeishTask extends AbstractTask
{
public RealtimeishTask()
{
super("rt1", "rt", "rt1", "foo", null);
super("rt1", "rt", new TaskResource("rt1", 1), "foo", null);
}
public RealtimeishTask(String id, String groupId, String availGroup, String dataSource, Interval interval)
public RealtimeishTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
{
super(id, groupId, availGroup, dataSource, interval);
super(id, groupId, taskResource, dataSource, interval);
}
@Override

View File

@ -18,6 +18,8 @@ import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker;
@ -66,8 +68,6 @@ public class RemoteTaskRunnerTest
private RemoteTaskRunner remoteTaskRunner;
private WorkerTaskMonitor workerTaskMonitor;
private ScheduledExecutorService scheduledExec;
private TestTask task1;
private Worker worker1;
@ -249,22 +249,26 @@ public class RemoteTaskRunnerTest
Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue());
}
@Test
public void testRunSameAvailabilityGroup() throws Exception
{
TestRealtimeTask theTask = new TestRealtimeTask("rt1", "rt1", "foo", TaskStatus.running("rt1"));
TestRealtimeTask theTask = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1")
);
remoteTaskRunner.run(theTask);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", "rt1", "foo", TaskStatus.running("rt2"))
new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"))
);
remoteTaskRunner.run(
new TestRealtimeTask("rt3", "rt2", "foo", TaskStatus.running("rt3"))
new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"))
);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (remoteTaskRunner.getRunningTasks().isEmpty()) {
while (remoteTaskRunner.getRunningTasks().size() < 2) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
@ -276,6 +280,36 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
@Test
public void testRunWithCapacity() throws Exception
{
TestRealtimeTask theTask = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1")
);
remoteTaskRunner.run(theTask);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"))
);
remoteTaskRunner.run(
new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"))
);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (remoteTaskRunner.getRunningTasks().size() < 2) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
private void makeTaskMonitor() throws Exception
{
@ -361,21 +395,18 @@ public class RemoteTaskRunnerTest
private void makeRemoteTaskRunner() throws Exception
{
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
new TestRemoteTaskRunnerConfig(),
cf,
pathChildrenCache,
scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()),
new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null)),
null
);
// Create a single worker and wait for things for be ready
remoteTaskRunner.start();
remoteTaskRunner.bootstrap(Lists.<Task>newArrayList());
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
String.format("%s/worker1", announcementsPath),
jsonMapper.writeValueAsBytes(worker1)
@ -390,27 +421,6 @@ public class RemoteTaskRunnerTest
}
}
private static class TestRetryPolicyConfig extends RetryPolicyConfig
{
@Override
public Duration getRetryMinDuration()
{
return null;
}
@Override
public Duration getRetryMaxDuration()
{
return null;
}
@Override
public long getMaxRetryCount()
{
return 0;
}
}
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override

View File

@ -36,6 +36,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
@ -284,7 +285,7 @@ public class TaskLifecycleTest
@Test
public void testSimple() throws Exception
{
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractTask("id1", "id1", new TaskResource("id1", 1), "ds", new Interval("2012-01-01/P1D"))
{
@Override
public String getType()
@ -321,7 +322,7 @@ public class TaskLifecycleTest
@Test
public void testBadInterval() throws Exception
{
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public String getType()
@ -355,7 +356,7 @@ public class TaskLifecycleTest
@Test
public void testBadVersion() throws Exception
{
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public String getType()

View File

@ -346,7 +346,7 @@ public class TaskQueueTest
private static Task newTask(final String id, final String groupId, final String dataSource, final Interval interval)
{
return new AbstractTask(id, groupId, id, dataSource, interval)
return new AbstractTask(id, groupId, dataSource, interval)
{
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
@ -370,7 +370,7 @@ public class TaskQueueTest
final List<Task> nextTasks
)
{
return new AbstractTask(id, groupId, id, dataSource, interval)
return new AbstractTask(id, groupId, dataSource, interval)
{
@Override
public String getType()

View File

@ -27,6 +27,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.task.RealtimeIndexTask;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
@ -40,14 +41,14 @@ public class TestRealtimeTask extends RealtimeIndexTask
@JsonCreator
public TestRealtimeTask(
@JsonProperty("id") String id,
@JsonProperty("availabilityGroup") String availGroup,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("taskStatus") TaskStatus status
)
{
super(
id,
availGroup,
taskResource,
new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()),
null,
null,

View File

@ -84,6 +84,12 @@ public class EC2AutoScalingStrategyTest
{
return "8080";
}
@Override
public String getWorkerVersion()
{
return "";
}
},
workerSetupData
);

View File

@ -42,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -128,7 +129,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -156,7 +157,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -172,7 +173,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -213,7 +214,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -231,7 +232,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -265,7 +266,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -295,7 +296,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -310,7 +311,7 @@ public class SimpleResourceManagementStrategyTest
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -334,18 +335,18 @@ public class SimpleResourceManagementStrategyTest
Task testTask
)
{
super(new Worker("host", "ip", 3, "version"), null, null);
super(new Worker("host", "ip", 3, "version"), null);
this.testTask = testTask;
}
@Override
public Set<String> getRunningTasks()
public Collection<Task> getRunningTasks()
{
if (testTask == null) {
return Sets.newHashSet();
}
return Sets.newHashSet(testTask.getId());
return Sets.newHashSet(testTask);
}
}
}

View File

@ -65,7 +65,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.7.1</version>
<version>0.7.2</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>