diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java
index d83b7d0de40..bb7c6c4a10f 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java
@@ -27,8 +27,8 @@ import com.google.common.base.Preconditions;
import com.metamx.druid.indexing.common.task.TaskResource;
/**
- * Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be
- * complete ({@link #isComplete()} true).
+ * Represents the status of a task from the perspective of the coordinator. The task may be ongoing
+ * ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
*
* TaskStatus objects are immutable.
*/
@@ -43,36 +43,38 @@ public class TaskStatus
public static TaskStatus running(String taskId)
{
- return new TaskStatus(taskId, Status.RUNNING, -1, null);
+ return new TaskStatus(taskId, Status.RUNNING, -1);
}
public static TaskStatus success(String taskId)
{
- return new TaskStatus(taskId, Status.SUCCESS, -1, null);
+ return new TaskStatus(taskId, Status.SUCCESS, -1);
}
public static TaskStatus failure(String taskId)
{
- return new TaskStatus(taskId, Status.FAILED, -1, null);
+ return new TaskStatus(taskId, Status.FAILED, -1);
+ }
+
+ public static TaskStatus fromCode(String taskId, Status code)
+ {
+ return new TaskStatus(taskId, code, -1);
}
private final String id;
private final Status status;
private final long duration;
- private final TaskResource resource;
@JsonCreator
private TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") Status status,
- @JsonProperty("duration") long duration,
- @JsonProperty("resource") TaskResource resource
+ @JsonProperty("duration") long duration
)
{
this.id = id;
this.status = status;
this.duration = duration;
- this.resource = resource == null ? new TaskResource(id, 1) : resource;
// Check class invariants.
Preconditions.checkNotNull(id, "id");
@@ -97,12 +99,6 @@ public class TaskStatus
return duration;
}
- @JsonProperty("resource")
- public TaskResource getResource()
- {
- return resource;
- }
-
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
@@ -144,7 +140,7 @@ public class TaskStatus
public TaskStatus withDuration(long _duration)
{
- return new TaskStatus(id, status, _duration, resource);
+ return new TaskStatus(id, status, _duration);
}
@Override
@@ -154,7 +150,6 @@ public class TaskStatus
.add("id", id)
.add("status", status)
.add("duration", duration)
- .add("resource", resource)
.toString();
}
}
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
index 8720525ae1e..f7a81a8af64 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
@@ -41,6 +41,7 @@ import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
+import com.metamx.druid.indexing.worker.TaskAnnouncement;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
@@ -291,9 +292,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
runningTasks.remove(task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
- TaskStatus status = zkWorker.getRunningTasks().get(task.getId());
- if (status.isComplete()) {
- taskComplete(runningTask, zkWorker, task.getId(), status);
+ TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
+ if (announcement.getTaskStatus().isComplete()) {
+ taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus());
}
return runningTask.getResult();
}
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java
index eb23c429a8a..c1e33525e23 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
+import com.metamx.druid.indexing.worker.TaskAnnouncement;
import com.metamx.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -48,7 +49,7 @@ public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
- private final Function cacheConverter;
+ private final Function cacheConverter;
private AtomicReference lastCompletedTaskTime = new AtomicReference(new DateTime());
@@ -56,13 +57,13 @@ public class ZkWorker implements Closeable
{
this.worker = worker;
this.statusCache = statusCache;
- this.cacheConverter = new Function()
+ this.cacheConverter = new Function()
{
@Override
- public TaskStatus apply(ChildData input)
+ public TaskAnnouncement apply(ChildData input)
{
try {
- return jsonMapper.readValue(input.getData(), TaskStatus.class);
+ return jsonMapper.readValue(input.getData(), TaskAnnouncement.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
@@ -93,14 +94,14 @@ public class ZkWorker implements Closeable
return getRunningTasks().keySet();
}
- public Map getRunningTasks()
+ public Map getRunningTasks()
{
- Map retVal = Maps.newHashMap();
- for (TaskStatus taskStatus : Lists.transform(
+ Map retVal = Maps.newHashMap();
+ for (TaskAnnouncement taskAnnouncement : Lists.transform(
statusCache.getCurrentData(),
cacheConverter
)) {
- retVal.put(taskStatus.getId(), taskStatus);
+ retVal.put(taskAnnouncement.getTaskStatus().getId(), taskAnnouncement);
}
return retVal;
@@ -110,8 +111,8 @@ public class ZkWorker implements Closeable
public int getCurrCapacityUsed()
{
int currCapacity = 0;
- for (TaskStatus taskStatus : getRunningTasks().values()) {
- currCapacity += taskStatus.getResource().getRequiredCapacity();
+ for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
+ currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
return currCapacity;
}
@@ -120,8 +121,8 @@ public class ZkWorker implements Closeable
public Set getAvailabilityGroups()
{
Set retVal = Sets.newHashSet();
- for (TaskStatus taskStatus : getRunningTasks().values()) {
- retVal.add(taskStatus.getResource().getAvailabilityGroup());
+ for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
+ retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
}
return retVal;
}
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java
new file mode 100644
index 00000000000..1af8d6d47cd
--- /dev/null
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java
@@ -0,0 +1,68 @@
+package com.metamx.druid.indexing.worker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.metamx.druid.indexing.common.TaskStatus;
+import com.metamx.druid.indexing.common.task.Task;
+import com.metamx.druid.indexing.common.task.TaskResource;
+
+/**
+ * Used by workers to announce the status of tasks they are currently running. This class is immutable.
+ */
+public class TaskAnnouncement
+{
+ private final TaskStatus taskStatus;
+ private final TaskResource taskResource;
+
+ public static TaskAnnouncement create(Task task, TaskStatus status)
+ {
+ Preconditions.checkArgument(status.getId().equals(task.getId()), "task id == status id");
+ return new TaskAnnouncement(null, null, status, task.getTaskResource());
+ }
+
+ @JsonCreator
+ private TaskAnnouncement(
+ @JsonProperty("id") String taskId,
+ @JsonProperty("status") TaskStatus.Status status,
+ @JsonProperty("taskStatus") TaskStatus taskStatus,
+ @JsonProperty("taskResource") TaskResource taskResource
+ )
+ {
+ if (taskStatus != null) {
+ this.taskStatus = taskStatus;
+ } else {
+ // Can be removed when backwards compat is no longer needed
+ this.taskStatus = TaskStatus.fromCode(taskId, status);
+ }
+ this.taskResource = taskResource == null ? new TaskResource(this.taskStatus.getId(), 1) : taskResource;
+ }
+
+ // Can be removed when backwards compat is no longer needed
+ @JsonProperty("id")
+ @Deprecated
+ public String getTaskId()
+ {
+ return taskStatus.getId();
+ }
+
+ // Can be removed when backwards compat is no longer needed
+ @JsonProperty("status")
+ @Deprecated
+ public TaskStatus.Status getStatus()
+ {
+ return taskStatus.getStatusCode();
+ }
+
+ @JsonProperty("taskStatus")
+ public TaskStatus getTaskStatus()
+ {
+ return taskStatus;
+ }
+
+ @JsonProperty("taskResource")
+ public TaskResource getTaskResource()
+ {
+ return taskResource;
+ }
+}
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java
index 89d6c313487..8c597bba6e9 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java
@@ -180,7 +180,7 @@ public class WorkerCuratorCoordinator
}
}
- public void announceStatus(TaskStatus status)
+ public void announceTask(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
@@ -188,7 +188,7 @@ public class WorkerCuratorCoordinator
}
try {
- byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
+ byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
@@ -196,7 +196,7 @@ public class WorkerCuratorCoordinator
curatorFramework.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(
- getStatusPathForId(status.getId()), rawBytes
+ getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
);
}
catch (Exception e) {
@@ -205,7 +205,7 @@ public class WorkerCuratorCoordinator
}
}
- public void updateStatus(TaskStatus status)
+ public void updateAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
@@ -213,18 +213,18 @@ public class WorkerCuratorCoordinator
}
try {
- if (curatorFramework.checkExists().forPath(getStatusPathForId(status.getId())) == null) {
- announceStatus(status);
+ if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
+ announceTask(announcement);
return;
}
- byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
+ byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
curatorFramework.setData()
.forPath(
- getStatusPathForId(status.getId()), rawBytes
+ getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
);
}
catch (Exception e) {
@@ -232,4 +232,4 @@ public class WorkerCuratorCoordinator
}
}
}
-}
\ No newline at end of file
+}
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java
index 584ec0a2ef6..ed3e3e993b2 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java
@@ -118,7 +118,12 @@ public class WorkerTaskMonitor
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
- workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
+ workerCuratorCoordinator.announceTask(
+ TaskAnnouncement.create(
+ task,
+ TaskStatus.running(task.getId())
+ )
+ );
taskStatus = taskRunner.run(task).get();
}
catch (Exception e) {
@@ -134,7 +139,7 @@ public class WorkerTaskMonitor
taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime);
try {
- workerCuratorCoordinator.updateStatus(taskStatus);
+ workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus));
log.info(
"Job's finished. Completed [%s] with status [%s]",
task.getId(),
diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java
index f73d289526d..07d8346f65e 100644
--- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java
+++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java
@@ -31,6 +31,7 @@ import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
+import com.metamx.druid.indexing.worker.TaskAnnouncement;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger;
@@ -351,12 +352,12 @@ public class SimpleResourceManagementStrategyTest
}
@Override
- public Map getRunningTasks()
+ public Map getRunningTasks()
{
if (testTask == null) {
return Maps.newHashMap();
}
- return ImmutableMap.of(testTask.getId(), TaskStatus.running(testTask.getId()));
+ return ImmutableMap.of(testTask.getId(), TaskAnnouncement.create(testTask, TaskStatus.running(testTask.getId())));
}
}
}
diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/worker/TaskAnnouncementTest.java
new file mode 100644
index 00000000000..5202609f42d
--- /dev/null
+++ b/indexing-service/src/test/java/com/metamx/druid/indexing/worker/TaskAnnouncementTest.java
@@ -0,0 +1,59 @@
+package com.metamx.druid.indexing.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.metamx.druid.QueryGranularity;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.index.v1.IndexGranularity;
+import com.metamx.druid.indexing.common.TaskStatus;
+import com.metamx.druid.indexing.common.task.RealtimeIndexTask;
+import com.metamx.druid.indexing.common.task.Task;
+import com.metamx.druid.indexing.common.task.TaskResource;
+import com.metamx.druid.jackson.DefaultObjectMapper;
+import com.metamx.druid.realtime.Schema;
+import com.metamx.druid.shard.NoneShardSpec;
+import junit.framework.Assert;
+import org.joda.time.Period;
+import org.junit.Test;
+
+public class TaskAnnouncementTest
+{
+ @Test
+ public void testBackwardsCompatibleSerde() throws Exception
+ {
+ final Task task = new RealtimeIndexTask(
+ "theid",
+ new TaskResource("rofl", 2),
+ new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
+ null,
+ null,
+ new Period("PT10M"),
+ IndexGranularity.HOUR,
+ null
+ );
+ final TaskStatus status = TaskStatus.running(task.getId());
+ final TaskAnnouncement announcement = TaskAnnouncement.create(task, status);
+
+ final ObjectMapper jsonMapper = new DefaultObjectMapper();
+ final String statusJson = jsonMapper.writeValueAsString(status);
+ final String announcementJson = jsonMapper.writeValueAsString(announcement);
+
+ final TaskStatus statusFromStatus = jsonMapper.readValue(statusJson, TaskStatus.class);
+ final TaskStatus statusFromAnnouncement = jsonMapper.readValue(announcementJson, TaskStatus.class);
+ final TaskAnnouncement announcementFromStatus = jsonMapper.readValue(statusJson, TaskAnnouncement.class);
+ final TaskAnnouncement announcementFromAnnouncement = jsonMapper.readValue(
+ announcementJson,
+ TaskAnnouncement.class
+ );
+
+ Assert.assertEquals("theid", statusFromStatus.getId());
+ Assert.assertEquals("theid", statusFromAnnouncement.getId());
+ Assert.assertEquals("theid", announcementFromStatus.getTaskStatus().getId());
+ Assert.assertEquals("theid", announcementFromAnnouncement.getTaskStatus().getId());
+
+ Assert.assertEquals("theid", announcementFromStatus.getTaskResource().getAvailabilityGroup());
+ Assert.assertEquals("rofl", announcementFromAnnouncement.getTaskResource().getAvailabilityGroup());
+
+ Assert.assertEquals(1, announcementFromStatus.getTaskResource().getRequiredCapacity());
+ Assert.assertEquals(2, announcementFromAnnouncement.getTaskResource().getRequiredCapacity());
+ }
+}