1) on bootstrap, load all initial data and do a compare with bootstrapped tasks, delete any that are extra out there

2) change autoscaling logic such that it only works with remote task runnrs
3) zk workers use their own status caches to determine what they are running
This commit is contained in:
fjy 2013-07-26 14:32:08 -07:00
parent a1262760b2
commit 4ae8395538
23 changed files with 455 additions and 318 deletions

View File

@ -21,8 +21,11 @@ package com.metamx.druid.curator.cache;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ThreadUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
*/
@ -48,4 +51,43 @@ public class SimplePathChildrenCacheFactory implements PathChildrenCacheFactory
{
return new PathChildrenCache(curator, path, cacheData, compressed, exec);
}
public static class Builder
{
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
private boolean cacheData;
private boolean compressed;
private ExecutorService exec;
public Builder()
{
cacheData = true;
compressed = false;
exec = Executors.newSingleThreadExecutor(defaultThreadFactory);
}
public Builder withCacheData(boolean cacheData)
{
this.cacheData = cacheData;
return this;
}
public Builder withCompressed(boolean compressed)
{
this.compressed = compressed;
return this;
}
public Builder withExecutorService(ExecutorService exec)
{
this.exec = exec;
return this;
}
public SimplePathChildrenCacheFactory build()
{
return new SimplePathChildrenCacheFactory(cacheData, compressed, exec);
}
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.metamx.druid.indexing.common.task.TaskResource;
/**
* Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be
@ -42,33 +43,36 @@ public class TaskStatus
public static TaskStatus running(String taskId)
{
return new TaskStatus(taskId, Status.RUNNING, -1);
return new TaskStatus(taskId, Status.RUNNING, -1, null);
}
public static TaskStatus success(String taskId)
{
return new TaskStatus(taskId, Status.SUCCESS, -1);
return new TaskStatus(taskId, Status.SUCCESS, -1, null);
}
public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, Status.FAILED, -1);
return new TaskStatus(taskId, Status.FAILED, -1, null);
}
private final String id;
private final Status status;
private final long duration;
private final TaskResource resource;
@JsonCreator
private TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") Status status,
@JsonProperty("duration") long duration
@JsonProperty("duration") long duration,
@JsonProperty("resource") TaskResource resource
)
{
this.id = id;
this.status = status;
this.duration = duration;
this.resource = resource == null ? new TaskResource(id, 1) : resource;
// Check class invariants.
Preconditions.checkNotNull(id, "id");
@ -93,6 +97,12 @@ public class TaskStatus
return duration;
}
@JsonProperty("resource")
public TaskResource getResource()
{
return resource;
}
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
@ -134,7 +144,7 @@ public class TaskStatus
public TaskStatus withDuration(long _duration)
{
return new TaskStatus(id, status, _duration);
return new TaskStatus(id, status, _duration, resource);
}
@Override
@ -144,6 +154,7 @@ public class TaskStatus
.add("id", id)
.add("status", status)
.add("duration", duration)
.add("resource", resource)
.toString();
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.indexing.common.config;
import com.google.common.base.Joiner;
import org.skife.config.Config;
import org.skife.config.Default;
@ -26,15 +27,30 @@ import java.io.File;
public abstract class TaskConfig
{
private static Joiner joiner = Joiner.on("/");
@Config("druid.indexer.baseDir")
@Default("/tmp/")
public abstract String getBaseDir();
@Config("druid.indexer.taskDir")
@Default("/tmp/persistent/task")
public abstract File getBaseTaskDir();
public File getBaseTaskDir()
{
return new File(defaultPath("persistent/task"));
}
@Config("druid.indexer.hadoopWorkingPath")
public String getHadoopWorkingPath()
{
return defaultPath("druid-indexing");
}
@Config("druid.indexer.rowFlushBoundary")
@Default("500000")
public abstract int getDefaultRowFlushBoundary();
@Config("druid.indexer.hadoopWorkingPath")
@Default("/tmp/druid-indexing")
public abstract String getHadoopWorkingPath();
}
private String defaultPath(String subPath)
{
return joiner.join(getBaseDir(), subPath);
}
}

View File

@ -46,13 +46,11 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.RealtimeMetricsMonitor;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.NoopRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.RejectionPolicyFactory;
@ -115,22 +113,22 @@ public class RealtimeIndexTask extends AbstractTask
)
{
super(
id != null
? id
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()),
id == null
? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString())
:id,
String.format(
"index_realtime_%s",
schema.getDataSource()
),
taskResource != null
? taskResource
: new TaskResource(
taskResource == null
? new TaskResource(
makeTaskId(
schema.getDataSource(),
schema.getShardSpec().getPartitionNum(),
new DateTime().toString()
), 1
),
)
: taskResource,
schema.getDataSource(),
null
);

View File

@ -71,6 +71,10 @@ public interface Task
*/
public String getGroupId();
/**
* Returns a {@link com.metamx.druid.indexing.common.task.TaskResource} for this task. Task resources define specific
* worker requirements a task may require.
*/
public TaskResource getTaskResource();
/**

View File

@ -8,16 +8,16 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class TaskResource
{
private final String availabilityGroup;
private final int capacity;
private final int requiredCapacity;
@JsonCreator
public TaskResource(
@JsonProperty("availabilityGroup") String availabilityGroup,
@JsonProperty("capacity") int capacity
@JsonProperty("requiredCapacity") int requiredCapacity
)
{
this.availabilityGroup = availabilityGroup;
this.capacity = capacity;
this.requiredCapacity = requiredCapacity;
}
/**
@ -31,9 +31,22 @@ public class TaskResource
return availabilityGroup;
}
/**
* Returns the number of worker slots this task will take.
*/
@JsonProperty
public int getCapacity()
public int getRequiredCapacity()
{
return capacity;
return requiredCapacity;
}
@Override
public String toString()
{
return "TaskResource{" +
"availabilityGroup='" + availabilityGroup + '\'' +
", requiredCapacity=" + requiredCapacity +
'}';
}
}

View File

@ -21,13 +21,13 @@ package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ListenableFuture;
@ -35,6 +35,7 @@ import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.cache.PathChildrenCacheFactory;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
@ -46,8 +47,8 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import com.metamx.http.client.response.ToStringResponseHandler;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
@ -96,6 +97,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCache workerPathCache;
private final AtomicReference<WorkerSetupData> workerSetupData;
private final HttpClient httpClient;
@ -103,9 +105,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
// all workers that exist in ZK
private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>();
// all tasks that have been assigned to a worker
private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue();
private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue();
// tasks that have not yet run
private final TaskRunnerWorkQueue pendingTasks = new TaskRunnerWorkQueue();
private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
@ -117,7 +119,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config,
CuratorFramework cf,
PathChildrenCache workerPathCache,
PathChildrenCacheFactory pathChildrenCacheFactory,
AtomicReference<WorkerSetupData> workerSetupData,
HttpClient httpClient
)
@ -125,7 +127,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
this.jsonMapper = jsonMapper;
this.config = config;
this.cf = cf;
this.workerPathCache = workerPathCache;
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, config.getIndexerAnnouncementPath());
this.workerSetupData = workerSetupData;
this.httpClient = httpClient;
}
@ -168,13 +171,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
}
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
public Collection<RemoteTaskRunnerWorkItem> getRunningTasks()
{
return runningTasks.values();
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
public Collection<RemoteTaskRunnerWorkItem> getPendingTasks()
{
return pendingTasks.values();
}
@ -200,53 +203,73 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
public void bootstrap(List<Task> tasks)
{
try {
Map<String, ZkWorker> existingTasks = Maps.newHashMap();
Set<String> existingWorkers = Sets.newHashSet(cf.getChildren().forPath(config.getIndexerAnnouncementPath()));
for (String existingWorker : existingWorkers) {
Worker worker = jsonMapper.readValue(
cf.getData()
.forPath(
JOINER.join(
config.getIndexerAnnouncementPath(),
existingWorker
)
), Worker.class
// Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
Worker worker;
switch (event.getType()) {
case CHILD_ADDED:
worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Worker[%s] reportin' for duty!", worker.getHost());
addWorker(worker, PathChildrenCache.StartMode.NORMAL);
break;
case CHILD_REMOVED:
worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
removeWorker(worker);
break;
default:
break;
}
}
}
);
workerPathCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
Set<String> existingTasks = Sets.newHashSet();
for (ChildData childData : workerPathCache.getCurrentData()) {
final Worker worker = jsonMapper.readValue(
childData.getData(),
Worker.class
);
ZkWorker zkWorker = addWorker(worker);
List<String> runningTasks = cf.getChildren()
.forPath(JOINER.join(config.getIndexerStatusPath(), existingWorker));
for (String runningTask : runningTasks) {
existingTasks.put(runningTask, zkWorker);
}
log.info("Worker[%s] reportin' for duty!", worker.getHost());
final ZkWorker zkWorker = addWorker(worker, PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
existingTasks.addAll(zkWorker.getRunningTasks().keySet());
}
// initialize data structures
for (Task task : tasks) {
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(
task,
SettableFuture.<TaskStatus>create(),
new DateTime()
);
ZkWorker zkWorker = existingTasks.remove(task.getId());
if (zkWorker != null) {
runningTasks.put(task.getId(), taskRunnerWorkItem);
zkWorker.addRunningTask(task);
} else {
addPendingTask(taskRunnerWorkItem);
}
}
Set<String> bootstrappedTasks = Sets.newHashSet(
Lists.transform(
tasks,
new Function<Task, String>()
{
@Override
public String apply(Task input)
{
return input.getId();
}
}
)
);
// shutdown any tasks that we don't know about
for (String existingTask : existingTasks.keySet()) {
shutdown(existingTask);
for (String taskId : Sets.difference(existingTasks, bootstrappedTasks)) {
shutdown(taskId);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
init();
}
/**
@ -260,8 +283,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId());
}
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(
task, SettableFuture.<TaskStatus>create(), new DateTime()
RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
task,
SettableFuture.<TaskStatus>create()
);
addPendingTask(taskRunnerWorkItem);
return taskRunnerWorkItem.getResult();
@ -301,7 +325,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
);
if (!response.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
throw new ISE("Shutdown failed");
log.error("Shutdown failed for %s! Are you sure the task was running?", taskId);
}
}
catch (Exception e) {
@ -362,7 +386,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
*
* @param taskRunnerWorkItem
*/
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
private void addPendingTask(final RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{
log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId());
@ -386,8 +410,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
try {
// make a copy of the pending tasks because assignTask may delete tasks from pending and move them
// into running status
List<TaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
for (TaskRunnerWorkItem taskWrapper : copy) {
List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
for (RemoteTaskRunnerWorkItem taskWrapper : copy) {
assignTask(taskWrapper);
}
}
@ -426,12 +450,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
*
* @param taskRunnerWorkItem - the task to assign
*/
private void assignTask(TaskRunnerWorkItem taskRunnerWorkItem)
private void assignTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{
try {
final String taskId = taskRunnerWorkItem.getTask().getId();
if (runningTasks.containsKey(taskId)) {
if (runningTasks.containsKey(taskId) || findWorkerRunningTask(taskId) != null) {
log.info("Task[%s] already running.", taskId);
} else {
// Nothing running this task, announce it in ZK for a worker to run it
@ -453,7 +477,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
* @param theWorker The worker the task is assigned to
* @param taskRunnerWorkItem The task to be assigned
*/
private void announceTask(Worker theWorker, TaskRunnerWorkItem taskRunnerWorkItem) throws Exception
private void announceTask(Worker theWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
{
final Task task = taskRunnerWorkItem.getTask();
@ -492,51 +516,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
config.getTaskAssignmentTimeoutDuration()
);
failTask(taskRunnerWorkItem);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
break;
}
}
}
}
private void init()
{
try {
// Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Worker[%s] reportin' for duty!", worker.getHost());
ZkWorker zkWorker = addWorker(worker);
initWorker(zkWorker);
runPendingTasks();
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
removeWorker(worker);
}
}
}
);
workerPathCache.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* When a new worker appears, listeners are registered for status changes associated with tasks assigned to
* the worker. Status changes indicate the creation or completion of a task.
@ -544,28 +530,17 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
*
* @param worker - contains metadata for a worker that has appeared in ZK
*/
private ZkWorker addWorker(final Worker worker)
private ZkWorker addWorker(final Worker worker, PathChildrenCache.StartMode startMode)
{
try {
final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final ZkWorker zkWorker = new ZkWorker(
worker,
statusCache
statusCache,
jsonMapper
);
zkWorkers.put(worker.getHost(), zkWorker);
return zkWorker;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private void initWorker(final ZkWorker zkWorker)
{
try {
// Add status listener to the watcher for status changes
zkWorker.addListener(
new PathChildrenCacheListener()
@ -573,58 +548,59 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
String taskId;
RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) {
try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
);
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
);
log.info(
"Worker[%s] wrote %s status for task: %s",
zkWorker.getWorker().getHost(),
taskStatus.getStatusCode(),
taskId
);
// Synchronizing state with ZK
statusLock.notify();
final TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem == null) {
log.warn(
"WTF?! Worker[%s] announcing a status for a task I didn't know about: %s",
log.info(
"Worker[%s] wrote %s status for task: %s",
zkWorker.getWorker().getHost(),
taskStatus.getStatusCode(),
taskId
);
} else {
zkWorker.addRunningTask(taskRunnerWorkItem.getTask());
}
if (taskStatus.isComplete()) {
if (taskRunnerWorkItem != null) {
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
// Synchronizing state with ZK
statusLock.notify();
zkWorker.removeRunningTask(taskRunnerWorkItem.getTask());
taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem == null) {
log.warn(
"WTF?! Worker[%s] announcing a status for a task I didn't know about: %s",
zkWorker.getWorker().getHost(),
taskId
);
}
// Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime());
cleanup(zkWorker.getWorker().getHost(), taskId);
runPendingTasks();
}
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
failTask(taskRunnerWorkItem);
}
if (taskStatus.isComplete()) {
if (taskRunnerWorkItem != null) {
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
}
// Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime());
cleanup(zkWorker.getWorker().getHost(), taskId);
runPendingTasks();
}
break;
case CHILD_REMOVED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
}
break;
}
}
catch (Exception e) {
@ -638,7 +614,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
}
);
zkWorker.start();
zkWorker.start(startMode);
zkWorkers.put(worker.getHost(), zkWorker);
return zkWorker;
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -657,35 +636,30 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
Set<String> tasksToRetry = Sets.newHashSet(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
tasksToRetry.addAll(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
);
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
for (String assignedTask : cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), taskId);
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
failTask(taskRunnerWorkItem);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task %s", taskId);
log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);
}
}
zkWorker.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
try {
zkWorker.close();
}
catch (Exception e) {
log.error(e, "Exception closing worker %s!", worker.getHost());
}
zkWorkers.remove(worker.getHost());
}
}
@ -702,10 +676,4 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null;
}
private void failTask(TaskRunnerWorkItem taskRunnerWorkItem)
{
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
((SettableFuture<TaskStatus>) result).set(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
}
}

View File

@ -0,0 +1,63 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
/**
*/
public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final SettableFuture<TaskStatus> result;
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result
)
{
super(task, result);
this.result = result;
}
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result,
DateTime createdTime,
DateTime queueInsertionTime
)
{
super(task, result, createdTime, queueInsertionTime);
this.result = result;
}
public void setResult(TaskStatus status)
{
result.set(status);
}
@Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time);
}
}

View File

@ -25,10 +25,10 @@ import java.util.concurrent.ConcurrentSkipListMap;
/**
*/
public class TaskRunnerWorkQueue extends ConcurrentSkipListMap<String, TaskRunnerWorkItem>
public class RemoteTaskRunnerWorkQueue extends ConcurrentSkipListMap<String, RemoteTaskRunnerWorkItem>
{
@Override
public TaskRunnerWorkItem put(String s, TaskRunnerWorkItem taskRunnerWorkItem)
public RemoteTaskRunnerWorkItem put(String s, RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{
return super.put(s, taskRunnerWorkItem.withQueueInsertionTime(new DateTime()));
}

View File

@ -21,12 +21,11 @@ package com.metamx.druid.indexing.coordinator;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.task.Task;
@ -34,6 +33,8 @@ import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import org.apache.curator.framework.CuratorFramework;
@ -88,7 +89,6 @@ public class TaskMasterLifecycle
log.info("By the power of Grayskull, I have the power!");
taskRunner = runnerFactory.build();
resourceManagementScheduler = managementSchedulerFactory.build(taskRunner);
final TaskConsumer taskConsumer = new TaskConsumer(
taskQueue,
taskRunner,
@ -106,7 +106,13 @@ public class TaskMasterLifecycle
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
if ("remote".equalsIgnoreCase(indexerCoordinatorConfig.getRunnerImpl())) {
if (!(taskRunner instanceof RemoteTaskRunner)) {
throw new ISE("WTF?! We configured a remote runner and got %s", taskRunner.getClass());
}
resourceManagementScheduler = managementSchedulerFactory.build((RemoteTaskRunner) taskRunner);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
try {
leaderLifecycle.start();

View File

@ -47,9 +47,9 @@ public interface TaskRunner
*/
public void shutdown(String taskid);
public Collection<TaskRunnerWorkItem> getRunningTasks();
public Collection<? extends TaskRunnerWorkItem> getRunningTasks();
public Collection<TaskRunnerWorkItem> getPendingTasks();
public Collection<? extends TaskRunnerWorkItem> getPendingTasks();
public Collection<ZkWorker> getWorkers();
}

View File

@ -22,7 +22,6 @@ package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.druid.indexing.common.RetryPolicy;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
@ -39,15 +38,25 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
private volatile DateTime queueInsertionTime;
public TaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> result
)
{
this(task, result, new DateTime(), new DateTime());
}
public TaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> result,
DateTime createdTime
DateTime createdTime,
DateTime queueInsertionTime
)
{
this.task = task;
this.result = result;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
}
@JsonProperty
@ -75,8 +84,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
this.queueInsertionTime = time;
return this;
return new TaskRunnerWorkItem(task, result, createdTime, time);
}
@Override

View File

@ -89,7 +89,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
final TaskToolbox toolbox = toolboxFactory.build(task);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, new DateTime());
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture);
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>()
@ -190,14 +190,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private final Task task;
private final TaskToolbox toolbox;
private final DateTime createdTime;
public ExecutorServiceTaskRunnerCallable(Task task, TaskToolbox toolbox)
{
this.task = task;
this.toolbox = toolbox;
this.createdTime = new DateTime();
}
@Override
@ -249,11 +245,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
public TaskRunnerWorkItem getTaskRunnerWorkItem()
{
return new TaskRunnerWorkItem(
task,
null,
createdTime
);
return new TaskRunnerWorkItem(task, null);
}
}
}

View File

@ -31,14 +31,11 @@ import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@ -49,23 +46,32 @@ public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
private final Function<ChildData, TaskStatus> cacheConverter;
private final Object lock = new Object();
private final Map<String, Task> runningTasks = Maps.newHashMap();
private final Set<String> availabilityGroups = Sets.newHashSet();
private volatile int currCapacity = 0;
private volatile DateTime lastCompletedTaskTime = new DateTime();
public ZkWorker(Worker worker, PathChildrenCache statusCache)
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
this.worker = worker;
this.statusCache = statusCache;
this.cacheConverter = new Function<ChildData, TaskStatus>()
{
@Override
public TaskStatus apply(ChildData input)
{
try {
return jsonMapper.readValue(input.getData(), TaskStatus.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
}
public void start() throws Exception
public void start(PathChildrenCache.StartMode startMode) throws Exception
{
statusCache.start();
statusCache.start(startMode);
}
public void addListener(PathChildrenCacheListener listener)
@ -80,64 +86,61 @@ public class ZkWorker implements Closeable
}
@JsonProperty
public Collection<Task> getRunningTasks()
public Map<String, TaskStatus> getRunningTasks()
{
return runningTasks.values();
Map<String, TaskStatus> retVal = Maps.newHashMap();
for (TaskStatus taskStatus : Lists.transform(
statusCache.getCurrentData(),
cacheConverter
)) {
retVal.put(taskStatus.getId(), taskStatus);
}
return retVal;
}
@JsonProperty
@JsonProperty("currCapacity")
public int getCurrCapacity()
{
int currCapacity = 0;
for (TaskStatus taskStatus : getRunningTasks().values()) {
currCapacity += taskStatus.getResource().getRequiredCapacity();
}
return currCapacity;
}
@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
Set<String> retVal = Sets.newHashSet();
for (TaskStatus taskStatus : getRunningTasks().values()) {
retVal.add(taskStatus.getResource().getAvailabilityGroup());
}
return retVal;
}
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime;
}
public void addRunningTask(Task task)
{
synchronized (lock) {
runningTasks.put(task.getId(), task);
availabilityGroups.add(task.getTaskResource().getAvailabilityGroup());
currCapacity += task.getTaskResource().getCapacity();
}
}
public void addRunningTasks(Collection<Task> tasks)
{
for (Task task : tasks) {
addRunningTask(task);
}
}
public Task removeRunningTask(Task task)
{
synchronized (lock) {
currCapacity -= task.getTaskResource().getCapacity();
availabilityGroups.remove(task.getTaskResource().getAvailabilityGroup());
return runningTasks.remove(task.getId());
}
}
public boolean isRunningTask(String taskId)
{
return runningTasks.containsKey(taskId);
return getRunningTasks().containsKey(taskId);
}
public boolean isAtCapacity()
{
return currCapacity >= worker.getCapacity();
return getCurrCapacity() >= worker.getCapacity();
}
public boolean canRunTask(Task task)
{
return (worker.getCapacity() - currCapacity >= task.getTaskResource().getCapacity() && !availabilityGroups.contains(task.getTaskResource().getAvailabilityGroup()));
return (worker.getCapacity() - getCurrCapacity() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime = completedTaskTime;
@ -153,10 +156,8 @@ public class ZkWorker implements Closeable
public String toString()
{
return "ZkWorker{" +
"runningTasks=" + runningTasks +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
", currCapacity=" + currCapacity +
", worker=" + worker +
'}';
}
}

View File

@ -31,4 +31,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
@Config("druid.indexer.taskAssignmentTimeoutDuration")
@Default("PT5M")
public abstract Duration getTaskAssignmentTimeoutDuration();
@Config("druid.curator.compression.enable")
@Default("false")
public abstract boolean enableCompression();
}

View File

@ -27,7 +27,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -47,6 +46,7 @@ import com.metamx.druid.QueryableNode;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
@ -56,12 +56,10 @@ import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionToolbox;
import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskLogConfig;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
@ -115,7 +113,6 @@ import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -638,11 +635,13 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
public TaskRunner build()
{
final CuratorFramework curator = getCuratorFramework();
final RemoteTaskRunnerConfig remoteTaskRunnerConfig = getConfigFactory().build(RemoteTaskRunnerConfig.class);
return new RemoteTaskRunner(
getJsonMapper(),
getConfigFactory().build(RemoteTaskRunnerConfig.class),
remoteTaskRunnerConfig,
curator,
new PathChildrenCache(curator, indexerZkConfig.getIndexerAnnouncementPath(), true),
new SimplePathChildrenCacheFactory.Builder().withCompressed(remoteTaskRunnerConfig.enableCompression())
.build(),
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class),
httpClient
);
@ -678,7 +677,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(TaskRunner runner)
public ResourceManagementScheduler build(RemoteTaskRunner runner)
{
return new NoopResourceManagementScheduler();
}
@ -687,7 +686,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(TaskRunner runner)
public ResourceManagementScheduler build(RemoteTaskRunner runner)
{
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1,

View File

@ -24,6 +24,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunner;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -42,7 +43,7 @@ public class ResourceManagementScheduler
{
private static final Logger log = new Logger(ResourceManagementScheduler.class);
private final TaskRunner taskRunner;
private final RemoteTaskRunner taskRunner;
private final ResourceManagementStrategy resourceManagementStrategy;
private final ResourceManagementSchedulerConfig config;
private final ScheduledExecutorService exec;
@ -51,7 +52,7 @@ public class ResourceManagementScheduler
private volatile boolean started = false;
public ResourceManagementScheduler(
TaskRunner taskRunner,
RemoteTaskRunner taskRunner,
ResourceManagementStrategy resourceManagementStrategy,
ResourceManagementSchedulerConfig config,
ScheduledExecutorService exec

View File

@ -19,11 +19,11 @@
package com.metamx.druid.indexing.coordinator.scaling;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunner;
/**
*/
public interface ResourceManagementSchedulerFactory
{
public ResourceManagementScheduler build(TaskRunner runner);
public ResourceManagementScheduler build(RemoteTaskRunner runner);
}

View File

@ -19,7 +19,7 @@
package com.metamx.druid.indexing.coordinator.scaling;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker;
import java.util.Collection;
@ -30,9 +30,9 @@ import java.util.Collection;
*/
public interface ResourceManagementStrategy
{
public boolean doProvision(Collection<TaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public boolean doTerminate(Collection<TaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public ScalingStats getStats();
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
@ -68,7 +69,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doProvision(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) {
log.info(
@ -135,7 +136,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
@ -244,7 +245,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats;
}
private boolean hasTaskPendingBeyondThreshold(Collection<TaskRunnerWorkItem> pendingTasks)
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
{
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {

View File

@ -5,12 +5,15 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.cache.PathChildrenCacheFactory;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.metamx.druid.indexing.TestTask;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskStatus;
@ -64,7 +67,7 @@ public class RemoteTaskRunnerTest
private TestingCluster testingCluster;
private CuratorFramework cf;
private PathChildrenCache pathChildrenCache;
private PathChildrenCacheFactory pathChildrenCacheFactory;
private RemoteTaskRunner remoteTaskRunner;
private WorkerTaskMonitor workerTaskMonitor;
@ -91,7 +94,7 @@ public class RemoteTaskRunnerTest
cf.create().forPath(statusPath);
cf.create().forPath(String.format("%s/worker1", statusPath));
pathChildrenCache = new PathChildrenCache(cf, announcementsPath, true);
pathChildrenCacheFactory = new SimplePathChildrenCacheFactory.Builder().build();
worker1 = new Worker(
"worker1",
@ -362,14 +365,11 @@ public class RemoteTaskRunnerTest
new TaskConfig()
{
@Override
public File getBaseTaskDir()
public String getBaseDir()
{
try {
return File.createTempFile("billy", "yay");
}
catch (Exception e) {
throw Throwables.propagate(e);
}
File tmp = Files.createTempDir();
tmp.deleteOnExit();
return tmp.toString();
}
@Override
@ -399,7 +399,7 @@ public class RemoteTaskRunnerTest
jsonMapper,
new TestRemoteTaskRunnerConfig(),
cf,
pathChildrenCache,
pathChildrenCacheFactory,
new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null)),
null
);
@ -423,6 +423,12 @@ public class RemoteTaskRunnerTest
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
public boolean enableCompression()
{
return false;
}
@Override
public String getIndexerAnnouncementPath()
{

View File

@ -121,9 +121,9 @@ public class TaskLifecycleTest
new TaskConfig()
{
@Override
public File getBaseTaskDir()
public String getBaseDir()
{
return tmp;
return tmp.toString();
}
@Override

View File

@ -19,17 +19,20 @@
package com.metamx.druid.indexing.coordinator.scaling;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexing.TestTask;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
@ -44,6 +47,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -128,8 +132,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -156,8 +160,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -172,8 +176,8 @@ public class SimpleResourceManagementStrategyTest
);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -213,8 +217,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -231,8 +235,8 @@ public class SimpleResourceManagementStrategyTest
Thread.sleep(2000);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -265,8 +269,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -295,8 +299,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -310,8 +314,8 @@ public class SimpleResourceManagementStrategyTest
);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -335,18 +339,18 @@ public class SimpleResourceManagementStrategyTest
Task testTask
)
{
super(new Worker("host", "ip", 3, "version"), null);
super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper());
this.testTask = testTask;
}
@Override
public Collection<Task> getRunningTasks()
public Map<String, TaskStatus> getRunningTasks()
{
if (testTask == null) {
return Sets.newHashSet();
return Maps.newHashMap();
}
return Sets.newHashSet(testTask);
return ImmutableMap.of(testTask.getId(), TaskStatus.running(testTask.getId()));
}
}
}