mirror of https://github.com/apache/druid.git
Merge pull request #224 from metamx/task-announcement
Task announcement
This commit is contained in:
commit
ff8642f5a0
|
@ -27,8 +27,8 @@ import com.google.common.base.Preconditions;
|
||||||
import com.metamx.druid.indexing.common.task.TaskResource;
|
import com.metamx.druid.indexing.common.task.TaskResource;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be
|
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
|
||||||
* complete ({@link #isComplete()} true).
|
* ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
|
||||||
* <p/>
|
* <p/>
|
||||||
* TaskStatus objects are immutable.
|
* TaskStatus objects are immutable.
|
||||||
*/
|
*/
|
||||||
|
@ -43,36 +43,38 @@ public class TaskStatus
|
||||||
|
|
||||||
public static TaskStatus running(String taskId)
|
public static TaskStatus running(String taskId)
|
||||||
{
|
{
|
||||||
return new TaskStatus(taskId, Status.RUNNING, -1, null);
|
return new TaskStatus(taskId, Status.RUNNING, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TaskStatus success(String taskId)
|
public static TaskStatus success(String taskId)
|
||||||
{
|
{
|
||||||
return new TaskStatus(taskId, Status.SUCCESS, -1, null);
|
return new TaskStatus(taskId, Status.SUCCESS, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TaskStatus failure(String taskId)
|
public static TaskStatus failure(String taskId)
|
||||||
{
|
{
|
||||||
return new TaskStatus(taskId, Status.FAILED, -1, null);
|
return new TaskStatus(taskId, Status.FAILED, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TaskStatus fromCode(String taskId, Status code)
|
||||||
|
{
|
||||||
|
return new TaskStatus(taskId, code, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String id;
|
private final String id;
|
||||||
private final Status status;
|
private final Status status;
|
||||||
private final long duration;
|
private final long duration;
|
||||||
private final TaskResource resource;
|
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
private TaskStatus(
|
private TaskStatus(
|
||||||
@JsonProperty("id") String id,
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("status") Status status,
|
@JsonProperty("status") Status status,
|
||||||
@JsonProperty("duration") long duration,
|
@JsonProperty("duration") long duration
|
||||||
@JsonProperty("resource") TaskResource resource
|
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.status = status;
|
this.status = status;
|
||||||
this.duration = duration;
|
this.duration = duration;
|
||||||
this.resource = resource == null ? new TaskResource(id, 1) : resource;
|
|
||||||
|
|
||||||
// Check class invariants.
|
// Check class invariants.
|
||||||
Preconditions.checkNotNull(id, "id");
|
Preconditions.checkNotNull(id, "id");
|
||||||
|
@ -97,12 +99,6 @@ public class TaskStatus
|
||||||
return duration;
|
return duration;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty("resource")
|
|
||||||
public TaskResource getResource()
|
|
||||||
{
|
|
||||||
return resource;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
|
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
|
||||||
* isSuccess, or isFailure will be true at any one time.
|
* isSuccess, or isFailure will be true at any one time.
|
||||||
|
@ -144,7 +140,7 @@ public class TaskStatus
|
||||||
|
|
||||||
public TaskStatus withDuration(long _duration)
|
public TaskStatus withDuration(long _duration)
|
||||||
{
|
{
|
||||||
return new TaskStatus(id, status, _duration, resource);
|
return new TaskStatus(id, status, _duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -154,7 +150,6 @@ public class TaskStatus
|
||||||
.add("id", id)
|
.add("id", id)
|
||||||
.add("status", status)
|
.add("status", status)
|
||||||
.add("duration", duration)
|
.add("duration", duration)
|
||||||
.add("resource", resource)
|
|
||||||
.toString();
|
.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,12 +26,10 @@ import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
import com.google.common.io.Closeables;
|
|
||||||
import com.google.common.io.Closer;
|
import com.google.common.io.Closer;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import com.google.common.io.InputSupplier;
|
import com.google.common.io.InputSupplier;
|
||||||
|
@ -49,7 +47,6 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -63,7 +60,6 @@ import java.util.Properties;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs tasks in separate processes using {@link ExecutorMain}.
|
* Runs tasks in separate processes using {@link ExecutorMain}.
|
||||||
|
@ -79,7 +75,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
private final ListeningExecutorService exec;
|
private final ListeningExecutorService exec;
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
|
|
||||||
private final Map<String, TaskInfo> tasks = Maps.newHashMap();
|
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newHashMap();
|
||||||
|
|
||||||
public ForkingTaskRunner(
|
public ForkingTaskRunner(
|
||||||
ForkingTaskRunnerConfig config,
|
ForkingTaskRunnerConfig config,
|
||||||
|
@ -109,7 +105,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
if (!tasks.containsKey(task.getId())) {
|
if (!tasks.containsKey(task.getId())) {
|
||||||
tasks.put(
|
tasks.put(
|
||||||
task.getId(),
|
task.getId(),
|
||||||
new TaskInfo(
|
new ForkingTaskRunnerWorkItem(
|
||||||
|
task,
|
||||||
exec.submit(
|
exec.submit(
|
||||||
new Callable<TaskStatus>()
|
new Callable<TaskStatus>()
|
||||||
{
|
{
|
||||||
|
@ -135,17 +132,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
|
|
||||||
// time to adjust process holders
|
// time to adjust process holders
|
||||||
synchronized (tasks) {
|
synchronized (tasks) {
|
||||||
final TaskInfo taskInfo = tasks.get(task.getId());
|
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
|
||||||
|
|
||||||
if (taskInfo.shutdown) {
|
if (taskWorkItem.shutdown) {
|
||||||
throw new IllegalStateException("Task has been shut down!");
|
throw new IllegalStateException("Task has been shut down!");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskInfo == null) {
|
if (taskWorkItem == null) {
|
||||||
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
|
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskInfo.processHolder != null) {
|
if (taskWorkItem.processHolder != null) {
|
||||||
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
|
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,13 +203,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
jsonMapper.writeValue(taskFile, task);
|
jsonMapper.writeValue(taskFile, task);
|
||||||
|
|
||||||
log.info("Running command: %s", Joiner.on(" ").join(command));
|
log.info("Running command: %s", Joiner.on(" ").join(command));
|
||||||
taskInfo.processHolder = new ProcessHolder(
|
taskWorkItem.processHolder = new ProcessHolder(
|
||||||
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
|
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
|
||||||
logFile,
|
logFile,
|
||||||
childPort
|
childPort
|
||||||
);
|
);
|
||||||
|
|
||||||
processHolder = taskInfo.processHolder;
|
processHolder = taskWorkItem.processHolder;
|
||||||
processHolder.registerWithCloser(closer);
|
processHolder.registerWithCloser(closer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,9 +258,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
finally {
|
finally {
|
||||||
try {
|
try {
|
||||||
synchronized (tasks) {
|
synchronized (tasks) {
|
||||||
final TaskInfo taskInfo = tasks.remove(task.getId());
|
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
|
||||||
if (taskInfo != null && taskInfo.processHolder != null) {
|
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
|
||||||
taskInfo.processHolder.process.destroy();
|
taskWorkItem.processHolder.process.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,7 +278,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return tasks.get(task.getId()).statusFuture;
|
return tasks.get(task.getId()).getResult();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,10 +288,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
synchronized (tasks) {
|
synchronized (tasks) {
|
||||||
exec.shutdown();
|
exec.shutdown();
|
||||||
|
|
||||||
for (TaskInfo taskInfo : tasks.values()) {
|
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
|
||||||
if (taskInfo.processHolder != null) {
|
if (taskWorkItem.processHolder != null) {
|
||||||
log.info("Destroying process: %s", taskInfo.processHolder.process);
|
log.info("Destroying process: %s", taskWorkItem.processHolder.process);
|
||||||
taskInfo.processHolder.process.destroy();
|
taskWorkItem.processHolder.process.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -303,7 +300,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
@Override
|
@Override
|
||||||
public void shutdown(final String taskid)
|
public void shutdown(final String taskid)
|
||||||
{
|
{
|
||||||
final TaskInfo taskInfo;
|
final ForkingTaskRunnerWorkItem taskInfo;
|
||||||
|
|
||||||
synchronized (tasks) {
|
synchronized (tasks) {
|
||||||
taskInfo = tasks.get(taskid);
|
taskInfo = tasks.get(taskid);
|
||||||
|
@ -326,13 +323,29 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
@Override
|
@Override
|
||||||
public Collection<TaskRunnerWorkItem> getRunningTasks()
|
public Collection<TaskRunnerWorkItem> getRunningTasks()
|
||||||
{
|
{
|
||||||
return ImmutableList.of();
|
synchronized (tasks) {
|
||||||
|
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
|
||||||
|
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
|
||||||
|
if (taskWorkItem.processHolder != null) {
|
||||||
|
ret.add(taskWorkItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<TaskRunnerWorkItem> getPendingTasks()
|
public Collection<TaskRunnerWorkItem> getPendingTasks()
|
||||||
{
|
{
|
||||||
return ImmutableList.of();
|
synchronized (tasks) {
|
||||||
|
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
|
||||||
|
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
|
||||||
|
if (taskWorkItem.processHolder == null) {
|
||||||
|
ret.add(taskWorkItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -347,9 +360,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
final ProcessHolder processHolder;
|
final ProcessHolder processHolder;
|
||||||
|
|
||||||
synchronized (tasks) {
|
synchronized (tasks) {
|
||||||
final TaskInfo taskInfo = tasks.get(taskid);
|
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid);
|
||||||
if (taskInfo != null && taskInfo.processHolder != null) {
|
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
|
||||||
processHolder = taskInfo.processHolder;
|
processHolder = taskWorkItem.processHolder;
|
||||||
} else {
|
} else {
|
||||||
return Optional.absent();
|
return Optional.absent();
|
||||||
}
|
}
|
||||||
|
@ -380,13 +393,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
int port = config.getStartPort();
|
int port = config.getStartPort();
|
||||||
int maxPortSoFar = -1;
|
int maxPortSoFar = -1;
|
||||||
|
|
||||||
for (TaskInfo taskInfo : tasks.values()) {
|
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
|
||||||
if (taskInfo.processHolder != null) {
|
if (taskWorkItem.processHolder != null) {
|
||||||
if (taskInfo.processHolder.port > maxPortSoFar) {
|
if (taskWorkItem.processHolder.port > maxPortSoFar) {
|
||||||
maxPortSoFar = taskInfo.processHolder.port;
|
maxPortSoFar = taskWorkItem.processHolder.port;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskInfo.processHolder.port == port) {
|
if (taskWorkItem.processHolder.port == port) {
|
||||||
port = maxPortSoFar + 1;
|
port = maxPortSoFar + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -396,15 +409,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TaskInfo
|
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
|
||||||
{
|
{
|
||||||
private final ListenableFuture<TaskStatus> statusFuture;
|
|
||||||
private volatile boolean shutdown = false;
|
private volatile boolean shutdown = false;
|
||||||
private volatile ProcessHolder processHolder = null;
|
private volatile ProcessHolder processHolder = null;
|
||||||
|
|
||||||
private TaskInfo(ListenableFuture<TaskStatus> statusFuture)
|
private ForkingTaskRunnerWorkItem(
|
||||||
|
Task task,
|
||||||
|
ListenableFuture<TaskStatus> statusFuture
|
||||||
|
)
|
||||||
{
|
{
|
||||||
this.statusFuture = statusFuture;
|
super(task, statusFuture);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ import com.metamx.druid.indexing.common.task.Task;
|
||||||
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
|
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
|
||||||
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
|
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
|
||||||
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
|
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
|
||||||
|
import com.metamx.druid.indexing.worker.TaskAnnouncement;
|
||||||
import com.metamx.druid.indexing.worker.Worker;
|
import com.metamx.druid.indexing.worker.Worker;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import com.metamx.http.client.HttpClient;
|
import com.metamx.http.client.HttpClient;
|
||||||
|
@ -291,9 +292,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
runningTasks.remove(task.getId());
|
runningTasks.remove(task.getId());
|
||||||
} else {
|
} else {
|
||||||
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
|
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
|
||||||
TaskStatus status = zkWorker.getRunningTasks().get(task.getId());
|
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
|
||||||
if (status.isComplete()) {
|
if (announcement.getTaskStatus().isComplete()) {
|
||||||
taskComplete(runningTask, zkWorker, task.getId(), status);
|
taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus());
|
||||||
}
|
}
|
||||||
return runningTask.getResult();
|
return runningTask.getResult();
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,8 +35,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
|
||||||
private final Task task;
|
private final Task task;
|
||||||
private final ListenableFuture<TaskStatus> result;
|
private final ListenableFuture<TaskStatus> result;
|
||||||
private final DateTime createdTime;
|
private final DateTime createdTime;
|
||||||
|
private final DateTime queueInsertionTime;
|
||||||
private volatile DateTime queueInsertionTime;
|
|
||||||
|
|
||||||
public TaskRunnerWorkItem(
|
public TaskRunnerWorkItem(
|
||||||
Task task,
|
Task task,
|
||||||
|
|
|
@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.metamx.druid.indexing.common.TaskStatus;
|
import com.metamx.druid.indexing.common.TaskStatus;
|
||||||
import com.metamx.druid.indexing.common.task.Task;
|
import com.metamx.druid.indexing.common.task.Task;
|
||||||
|
import com.metamx.druid.indexing.worker.TaskAnnouncement;
|
||||||
import com.metamx.druid.indexing.worker.Worker;
|
import com.metamx.druid.indexing.worker.Worker;
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
||||||
|
@ -48,7 +49,7 @@ public class ZkWorker implements Closeable
|
||||||
{
|
{
|
||||||
private final Worker worker;
|
private final Worker worker;
|
||||||
private final PathChildrenCache statusCache;
|
private final PathChildrenCache statusCache;
|
||||||
private final Function<ChildData, TaskStatus> cacheConverter;
|
private final Function<ChildData, TaskAnnouncement> cacheConverter;
|
||||||
|
|
||||||
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
|
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
|
||||||
|
|
||||||
|
@ -56,13 +57,13 @@ public class ZkWorker implements Closeable
|
||||||
{
|
{
|
||||||
this.worker = worker;
|
this.worker = worker;
|
||||||
this.statusCache = statusCache;
|
this.statusCache = statusCache;
|
||||||
this.cacheConverter = new Function<ChildData, TaskStatus>()
|
this.cacheConverter = new Function<ChildData, TaskAnnouncement>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public TaskStatus apply(ChildData input)
|
public TaskAnnouncement apply(ChildData input)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
return jsonMapper.readValue(input.getData(), TaskStatus.class);
|
return jsonMapper.readValue(input.getData(), TaskAnnouncement.class);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
|
@ -93,14 +94,14 @@ public class ZkWorker implements Closeable
|
||||||
return getRunningTasks().keySet();
|
return getRunningTasks().keySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, TaskStatus> getRunningTasks()
|
public Map<String, TaskAnnouncement> getRunningTasks()
|
||||||
{
|
{
|
||||||
Map<String, TaskStatus> retVal = Maps.newHashMap();
|
Map<String, TaskAnnouncement> retVal = Maps.newHashMap();
|
||||||
for (TaskStatus taskStatus : Lists.transform(
|
for (TaskAnnouncement taskAnnouncement : Lists.transform(
|
||||||
statusCache.getCurrentData(),
|
statusCache.getCurrentData(),
|
||||||
cacheConverter
|
cacheConverter
|
||||||
)) {
|
)) {
|
||||||
retVal.put(taskStatus.getId(), taskStatus);
|
retVal.put(taskAnnouncement.getTaskStatus().getId(), taskAnnouncement);
|
||||||
}
|
}
|
||||||
|
|
||||||
return retVal;
|
return retVal;
|
||||||
|
@ -110,8 +111,8 @@ public class ZkWorker implements Closeable
|
||||||
public int getCurrCapacityUsed()
|
public int getCurrCapacityUsed()
|
||||||
{
|
{
|
||||||
int currCapacity = 0;
|
int currCapacity = 0;
|
||||||
for (TaskStatus taskStatus : getRunningTasks().values()) {
|
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
|
||||||
currCapacity += taskStatus.getResource().getRequiredCapacity();
|
currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
|
||||||
}
|
}
|
||||||
return currCapacity;
|
return currCapacity;
|
||||||
}
|
}
|
||||||
|
@ -120,8 +121,8 @@ public class ZkWorker implements Closeable
|
||||||
public Set<String> getAvailabilityGroups()
|
public Set<String> getAvailabilityGroups()
|
||||||
{
|
{
|
||||||
Set<String> retVal = Sets.newHashSet();
|
Set<String> retVal = Sets.newHashSet();
|
||||||
for (TaskStatus taskStatus : getRunningTasks().values()) {
|
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
|
||||||
retVal.add(taskStatus.getResource().getAvailabilityGroup());
|
retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
|
||||||
}
|
}
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.indexing.worker;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.metamx.druid.indexing.common.TaskStatus;
|
||||||
|
import com.metamx.druid.indexing.common.task.Task;
|
||||||
|
import com.metamx.druid.indexing.common.task.TaskResource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by workers to announce the status of tasks they are currently running. This class is immutable.
|
||||||
|
*/
|
||||||
|
public class TaskAnnouncement
|
||||||
|
{
|
||||||
|
private final TaskStatus taskStatus;
|
||||||
|
private final TaskResource taskResource;
|
||||||
|
|
||||||
|
public static TaskAnnouncement create(Task task, TaskStatus status)
|
||||||
|
{
|
||||||
|
Preconditions.checkArgument(status.getId().equals(task.getId()), "task id == status id");
|
||||||
|
return new TaskAnnouncement(null, null, status, task.getTaskResource());
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
private TaskAnnouncement(
|
||||||
|
@JsonProperty("id") String taskId,
|
||||||
|
@JsonProperty("status") TaskStatus.Status status,
|
||||||
|
@JsonProperty("taskStatus") TaskStatus taskStatus,
|
||||||
|
@JsonProperty("taskResource") TaskResource taskResource
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (taskStatus != null) {
|
||||||
|
this.taskStatus = taskStatus;
|
||||||
|
} else {
|
||||||
|
// Can be removed when backwards compat is no longer needed
|
||||||
|
this.taskStatus = TaskStatus.fromCode(taskId, status);
|
||||||
|
}
|
||||||
|
this.taskResource = taskResource == null ? new TaskResource(this.taskStatus.getId(), 1) : taskResource;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Can be removed when backwards compat is no longer needed
|
||||||
|
@JsonProperty("id")
|
||||||
|
@Deprecated
|
||||||
|
public String getTaskId()
|
||||||
|
{
|
||||||
|
return taskStatus.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Can be removed when backwards compat is no longer needed
|
||||||
|
@JsonProperty("status")
|
||||||
|
@Deprecated
|
||||||
|
public TaskStatus.Status getStatus()
|
||||||
|
{
|
||||||
|
return taskStatus.getStatusCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("taskStatus")
|
||||||
|
public TaskStatus getTaskStatus()
|
||||||
|
{
|
||||||
|
return taskStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("taskResource")
|
||||||
|
public TaskResource getTaskResource()
|
||||||
|
{
|
||||||
|
return taskResource;
|
||||||
|
}
|
||||||
|
}
|
|
@ -180,7 +180,7 @@ public class WorkerCuratorCoordinator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void announceStatus(TaskStatus status)
|
public void announceTask(TaskAnnouncement announcement)
|
||||||
{
|
{
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (!started) {
|
if (!started) {
|
||||||
|
@ -188,7 +188,7 @@ public class WorkerCuratorCoordinator
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
|
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
|
||||||
if (rawBytes.length > config.getMaxNumBytes()) {
|
if (rawBytes.length > config.getMaxNumBytes()) {
|
||||||
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
|
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
|
||||||
}
|
}
|
||||||
|
@ -196,7 +196,7 @@ public class WorkerCuratorCoordinator
|
||||||
curatorFramework.create()
|
curatorFramework.create()
|
||||||
.withMode(CreateMode.EPHEMERAL)
|
.withMode(CreateMode.EPHEMERAL)
|
||||||
.forPath(
|
.forPath(
|
||||||
getStatusPathForId(status.getId()), rawBytes
|
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -205,7 +205,7 @@ public class WorkerCuratorCoordinator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateStatus(TaskStatus status)
|
public void updateAnnouncement(TaskAnnouncement announcement)
|
||||||
{
|
{
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (!started) {
|
if (!started) {
|
||||||
|
@ -213,18 +213,18 @@ public class WorkerCuratorCoordinator
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (curatorFramework.checkExists().forPath(getStatusPathForId(status.getId())) == null) {
|
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
|
||||||
announceStatus(status);
|
announceTask(announcement);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
|
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
|
||||||
if (rawBytes.length > config.getMaxNumBytes()) {
|
if (rawBytes.length > config.getMaxNumBytes()) {
|
||||||
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
|
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
curatorFramework.setData()
|
curatorFramework.setData()
|
||||||
.forPath(
|
.forPath(
|
||||||
getStatusPathForId(status.getId()), rawBytes
|
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
|
|
@ -118,7 +118,12 @@ public class WorkerTaskMonitor
|
||||||
TaskStatus taskStatus;
|
TaskStatus taskStatus;
|
||||||
try {
|
try {
|
||||||
workerCuratorCoordinator.unannounceTask(task.getId());
|
workerCuratorCoordinator.unannounceTask(task.getId());
|
||||||
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
|
workerCuratorCoordinator.announceTask(
|
||||||
|
TaskAnnouncement.create(
|
||||||
|
task,
|
||||||
|
TaskStatus.running(task.getId())
|
||||||
|
)
|
||||||
|
);
|
||||||
taskStatus = taskRunner.run(task).get();
|
taskStatus = taskRunner.run(task).get();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -134,7 +139,7 @@ public class WorkerTaskMonitor
|
||||||
taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime);
|
taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
workerCuratorCoordinator.updateStatus(taskStatus);
|
workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus));
|
||||||
log.info(
|
log.info(
|
||||||
"Job's finished. Completed [%s] with status [%s]",
|
"Job's finished. Completed [%s] with status [%s]",
|
||||||
task.getId(),
|
task.getId(),
|
||||||
|
|
|
@ -31,6 +31,7 @@ import com.metamx.druid.indexing.common.task.Task;
|
||||||
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
|
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
|
||||||
import com.metamx.druid.indexing.coordinator.ZkWorker;
|
import com.metamx.druid.indexing.coordinator.ZkWorker;
|
||||||
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
|
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
|
||||||
|
import com.metamx.druid.indexing.worker.TaskAnnouncement;
|
||||||
import com.metamx.druid.indexing.worker.Worker;
|
import com.metamx.druid.indexing.worker.Worker;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
@ -351,12 +352,12 @@ public class SimpleResourceManagementStrategyTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, TaskStatus> getRunningTasks()
|
public Map<String, TaskAnnouncement> getRunningTasks()
|
||||||
{
|
{
|
||||||
if (testTask == null) {
|
if (testTask == null) {
|
||||||
return Maps.newHashMap();
|
return Maps.newHashMap();
|
||||||
}
|
}
|
||||||
return ImmutableMap.of(testTask.getId(), TaskStatus.running(testTask.getId()));
|
return ImmutableMap.of(testTask.getId(), TaskAnnouncement.create(testTask, TaskStatus.running(testTask.getId())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
package com.metamx.druid.indexing.worker;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.metamx.druid.QueryGranularity;
|
||||||
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
|
import com.metamx.druid.index.v1.IndexGranularity;
|
||||||
|
import com.metamx.druid.indexing.common.TaskStatus;
|
||||||
|
import com.metamx.druid.indexing.common.task.RealtimeIndexTask;
|
||||||
|
import com.metamx.druid.indexing.common.task.Task;
|
||||||
|
import com.metamx.druid.indexing.common.task.TaskResource;
|
||||||
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
|
import com.metamx.druid.realtime.Schema;
|
||||||
|
import com.metamx.druid.shard.NoneShardSpec;
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.joda.time.Period;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TaskAnnouncementTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testBackwardsCompatibleSerde() throws Exception
|
||||||
|
{
|
||||||
|
final Task task = new RealtimeIndexTask(
|
||||||
|
"theid",
|
||||||
|
new TaskResource("rofl", 2),
|
||||||
|
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new Period("PT10M"),
|
||||||
|
IndexGranularity.HOUR,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
final TaskStatus status = TaskStatus.running(task.getId());
|
||||||
|
final TaskAnnouncement announcement = TaskAnnouncement.create(task, status);
|
||||||
|
|
||||||
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
final String statusJson = jsonMapper.writeValueAsString(status);
|
||||||
|
final String announcementJson = jsonMapper.writeValueAsString(announcement);
|
||||||
|
|
||||||
|
final TaskStatus statusFromStatus = jsonMapper.readValue(statusJson, TaskStatus.class);
|
||||||
|
final TaskStatus statusFromAnnouncement = jsonMapper.readValue(announcementJson, TaskStatus.class);
|
||||||
|
final TaskAnnouncement announcementFromStatus = jsonMapper.readValue(statusJson, TaskAnnouncement.class);
|
||||||
|
final TaskAnnouncement announcementFromAnnouncement = jsonMapper.readValue(
|
||||||
|
announcementJson,
|
||||||
|
TaskAnnouncement.class
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals("theid", statusFromStatus.getId());
|
||||||
|
Assert.assertEquals("theid", statusFromAnnouncement.getId());
|
||||||
|
Assert.assertEquals("theid", announcementFromStatus.getTaskStatus().getId());
|
||||||
|
Assert.assertEquals("theid", announcementFromAnnouncement.getTaskStatus().getId());
|
||||||
|
|
||||||
|
Assert.assertEquals("theid", announcementFromStatus.getTaskResource().getAvailabilityGroup());
|
||||||
|
Assert.assertEquals("rofl", announcementFromAnnouncement.getTaskResource().getAvailabilityGroup());
|
||||||
|
|
||||||
|
Assert.assertEquals(1, announcementFromStatus.getTaskResource().getRequiredCapacity());
|
||||||
|
Assert.assertEquals(2, announcementFromAnnouncement.getTaskResource().getRequiredCapacity());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue