From 960931476579c5db0175fe882e8de3445eda9bef Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 20 Aug 2013 14:04:28 -0700 Subject: [PATCH 1/3] ForkingTaskRunner: Make TaskInfo into ForkingTaskRunnerWorkItem This allows the API/GUI to return reasonable results when the primary task runner is a ForkingTaskRunner. --- .../coordinator/ForkingTaskRunner.java | 85 +++++++++++-------- .../coordinator/TaskRunnerWorkItem.java | 3 +- 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java index 0eb0094c6c0..3fe72d35d8d 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java @@ -26,12 +26,10 @@ import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.io.InputSupplier; @@ -49,7 +47,6 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain; import com.metamx.emitter.EmittingLogger; import org.apache.commons.io.FileUtils; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -63,7 +60,6 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; /** * Runs tasks in separate processes using {@link ExecutorMain}. @@ -79,7 +75,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider private final ListeningExecutorService exec; private final ObjectMapper jsonMapper; - private final Map tasks = Maps.newHashMap(); + private final Map tasks = Maps.newHashMap(); public ForkingTaskRunner( ForkingTaskRunnerConfig config, @@ -109,7 +105,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider if (!tasks.containsKey(task.getId())) { tasks.put( task.getId(), - new TaskInfo( + new ForkingTaskRunnerWorkItem( + task, exec.submit( new Callable() { @@ -135,17 +132,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider // time to adjust process holders synchronized (tasks) { - final TaskInfo taskInfo = tasks.get(task.getId()); + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); - if (taskInfo.shutdown) { + if (taskWorkItem.shutdown) { throw new IllegalStateException("Task has been shut down!"); } - if (taskInfo == null) { + if (taskWorkItem == null) { throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId()); } - if (taskInfo.processHolder != null) { + if (taskWorkItem.processHolder != null) { throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId()); } @@ -206,13 +203,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider jsonMapper.writeValue(taskFile, task); log.info("Running command: %s", Joiner.on(" ").join(command)); - taskInfo.processHolder = new ProcessHolder( + taskWorkItem.processHolder = new ProcessHolder( new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), logFile, childPort ); - processHolder = taskInfo.processHolder; + processHolder = taskWorkItem.processHolder; processHolder.registerWithCloser(closer); } @@ -261,9 +258,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider finally { try { synchronized (tasks) { - final TaskInfo taskInfo = tasks.remove(task.getId()); - if (taskInfo != null && taskInfo.processHolder != null) { - taskInfo.processHolder.process.destroy(); + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); + if (taskWorkItem != null && taskWorkItem.processHolder != null) { + taskWorkItem.processHolder.process.destroy(); } } @@ -281,7 +278,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider ); } - return tasks.get(task.getId()).statusFuture; + return tasks.get(task.getId()).getResult(); } } @@ -291,10 +288,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider synchronized (tasks) { exec.shutdown(); - for (TaskInfo taskInfo : tasks.values()) { - if (taskInfo.processHolder != null) { - log.info("Destroying process: %s", taskInfo.processHolder.process); - taskInfo.processHolder.process.destroy(); + for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder != null) { + log.info("Destroying process: %s", taskWorkItem.processHolder.process); + taskWorkItem.processHolder.process.destroy(); } } } @@ -303,7 +300,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider @Override public void shutdown(final String taskid) { - final TaskInfo taskInfo; + final ForkingTaskRunnerWorkItem taskInfo; synchronized (tasks) { taskInfo = tasks.get(taskid); @@ -326,13 +323,29 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider @Override public Collection getRunningTasks() { - return ImmutableList.of(); + synchronized (tasks) { + final List ret = Lists.newArrayList(); + for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder != null) { + ret.add(taskWorkItem); + } + } + return ret; + } } @Override public Collection getPendingTasks() { - return ImmutableList.of(); + synchronized (tasks) { + final List ret = Lists.newArrayList(); + for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder == null) { + ret.add(taskWorkItem); + } + } + return ret; + } } @Override @@ -347,9 +360,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider final ProcessHolder processHolder; synchronized (tasks) { - final TaskInfo taskInfo = tasks.get(taskid); - if (taskInfo != null && taskInfo.processHolder != null) { - processHolder = taskInfo.processHolder; + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid); + if (taskWorkItem != null && taskWorkItem.processHolder != null) { + processHolder = taskWorkItem.processHolder; } else { return Optional.absent(); } @@ -380,13 +393,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider int port = config.getStartPort(); int maxPortSoFar = -1; - for (TaskInfo taskInfo : tasks.values()) { - if (taskInfo.processHolder != null) { - if (taskInfo.processHolder.port > maxPortSoFar) { - maxPortSoFar = taskInfo.processHolder.port; + for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder != null) { + if (taskWorkItem.processHolder.port > maxPortSoFar) { + maxPortSoFar = taskWorkItem.processHolder.port; } - if (taskInfo.processHolder.port == port) { + if (taskWorkItem.processHolder.port == port) { port = maxPortSoFar + 1; } } @@ -396,15 +409,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider } } - private static class TaskInfo + private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem { - private final ListenableFuture statusFuture; private volatile boolean shutdown = false; private volatile ProcessHolder processHolder = null; - private TaskInfo(ListenableFuture statusFuture) + private ForkingTaskRunnerWorkItem( + Task task, + ListenableFuture statusFuture + ) { - this.statusFuture = statusFuture; + super(task, statusFuture); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java index 10a4ff5d1a3..e8f341284e1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java @@ -35,8 +35,7 @@ public class TaskRunnerWorkItem implements Comparable private final Task task; private final ListenableFuture result; private final DateTime createdTime; - - private volatile DateTime queueInsertionTime; + private final DateTime queueInsertionTime; public TaskRunnerWorkItem( Task task, From 455645e72310a14ccff623e1b1d8e4baabffe1ca Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 20 Aug 2013 16:14:36 -0700 Subject: [PATCH 2/3] Workers announce TaskAnnouncement rather than TaskStatus --- .../druid/indexing/common/TaskStatus.java | 29 ++++---- .../coordinator/RemoteTaskRunner.java | 7 +- .../druid/indexing/coordinator/ZkWorker.java | 25 +++---- .../indexing/worker/TaskAnnouncement.java | 68 +++++++++++++++++++ .../worker/WorkerCuratorCoordinator.java | 18 ++--- .../indexing/worker/WorkerTaskMonitor.java | 9 ++- .../SimpleResourceManagementStrategyTest.java | 5 +- .../indexing/worker/TaskAnnouncementTest.java | 59 ++++++++++++++++ 8 files changed, 175 insertions(+), 45 deletions(-) create mode 100644 indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java create mode 100644 indexing-service/src/test/java/com/metamx/druid/indexing/worker/TaskAnnouncementTest.java diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java index d83b7d0de40..bb7c6c4a10f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java @@ -27,8 +27,8 @@ import com.google.common.base.Preconditions; import com.metamx.druid.indexing.common.task.TaskResource; /** - * Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be - * complete ({@link #isComplete()} true). + * Represents the status of a task from the perspective of the coordinator. The task may be ongoing + * ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true). *

* TaskStatus objects are immutable. */ @@ -43,36 +43,38 @@ public class TaskStatus public static TaskStatus running(String taskId) { - return new TaskStatus(taskId, Status.RUNNING, -1, null); + return new TaskStatus(taskId, Status.RUNNING, -1); } public static TaskStatus success(String taskId) { - return new TaskStatus(taskId, Status.SUCCESS, -1, null); + return new TaskStatus(taskId, Status.SUCCESS, -1); } public static TaskStatus failure(String taskId) { - return new TaskStatus(taskId, Status.FAILED, -1, null); + return new TaskStatus(taskId, Status.FAILED, -1); + } + + public static TaskStatus fromCode(String taskId, Status code) + { + return new TaskStatus(taskId, code, -1); } private final String id; private final Status status; private final long duration; - private final TaskResource resource; @JsonCreator private TaskStatus( @JsonProperty("id") String id, @JsonProperty("status") Status status, - @JsonProperty("duration") long duration, - @JsonProperty("resource") TaskResource resource + @JsonProperty("duration") long duration ) { this.id = id; this.status = status; this.duration = duration; - this.resource = resource == null ? new TaskResource(id, 1) : resource; // Check class invariants. Preconditions.checkNotNull(id, "id"); @@ -97,12 +99,6 @@ public class TaskStatus return duration; } - @JsonProperty("resource") - public TaskResource getResource() - { - return resource; - } - /** * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable, * isSuccess, or isFailure will be true at any one time. @@ -144,7 +140,7 @@ public class TaskStatus public TaskStatus withDuration(long _duration) { - return new TaskStatus(id, status, _duration, resource); + return new TaskStatus(id, status, _duration); } @Override @@ -154,7 +150,6 @@ public class TaskStatus .add("id", id) .add("status", status) .add("duration", duration) - .add("resource", resource) .toString(); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index 8720525ae1e..f7a81a8af64 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -41,6 +41,7 @@ import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; +import com.metamx.druid.indexing.worker.TaskAnnouncement; import com.metamx.druid.indexing.worker.Worker; import com.metamx.emitter.EmittingLogger; import com.metamx.http.client.HttpClient; @@ -291,9 +292,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider runningTasks.remove(task.getId()); } else { log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost()); - TaskStatus status = zkWorker.getRunningTasks().get(task.getId()); - if (status.isComplete()) { - taskComplete(runningTask, zkWorker, task.getId(), status); + TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId()); + if (announcement.getTaskStatus().isComplete()) { + taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus()); } return runningTask.getResult(); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java index eb23c429a8a..c1e33525e23 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java @@ -28,6 +28,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.indexing.worker.TaskAnnouncement; import com.metamx.druid.indexing.worker.Worker; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -48,7 +49,7 @@ public class ZkWorker implements Closeable { private final Worker worker; private final PathChildrenCache statusCache; - private final Function cacheConverter; + private final Function cacheConverter; private AtomicReference lastCompletedTaskTime = new AtomicReference(new DateTime()); @@ -56,13 +57,13 @@ public class ZkWorker implements Closeable { this.worker = worker; this.statusCache = statusCache; - this.cacheConverter = new Function() + this.cacheConverter = new Function() { @Override - public TaskStatus apply(ChildData input) + public TaskAnnouncement apply(ChildData input) { try { - return jsonMapper.readValue(input.getData(), TaskStatus.class); + return jsonMapper.readValue(input.getData(), TaskAnnouncement.class); } catch (Exception e) { throw Throwables.propagate(e); @@ -93,14 +94,14 @@ public class ZkWorker implements Closeable return getRunningTasks().keySet(); } - public Map getRunningTasks() + public Map getRunningTasks() { - Map retVal = Maps.newHashMap(); - for (TaskStatus taskStatus : Lists.transform( + Map retVal = Maps.newHashMap(); + for (TaskAnnouncement taskAnnouncement : Lists.transform( statusCache.getCurrentData(), cacheConverter )) { - retVal.put(taskStatus.getId(), taskStatus); + retVal.put(taskAnnouncement.getTaskStatus().getId(), taskAnnouncement); } return retVal; @@ -110,8 +111,8 @@ public class ZkWorker implements Closeable public int getCurrCapacityUsed() { int currCapacity = 0; - for (TaskStatus taskStatus : getRunningTasks().values()) { - currCapacity += taskStatus.getResource().getRequiredCapacity(); + for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { + currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity(); } return currCapacity; } @@ -120,8 +121,8 @@ public class ZkWorker implements Closeable public Set getAvailabilityGroups() { Set retVal = Sets.newHashSet(); - for (TaskStatus taskStatus : getRunningTasks().values()) { - retVal.add(taskStatus.getResource().getAvailabilityGroup()); + for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { + retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup()); } return retVal; } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java new file mode 100644 index 00000000000..1af8d6d47cd --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java @@ -0,0 +1,68 @@ +package com.metamx.druid.indexing.worker; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.metamx.druid.indexing.common.TaskStatus; +import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.indexing.common.task.TaskResource; + +/** + * Used by workers to announce the status of tasks they are currently running. This class is immutable. + */ +public class TaskAnnouncement +{ + private final TaskStatus taskStatus; + private final TaskResource taskResource; + + public static TaskAnnouncement create(Task task, TaskStatus status) + { + Preconditions.checkArgument(status.getId().equals(task.getId()), "task id == status id"); + return new TaskAnnouncement(null, null, status, task.getTaskResource()); + } + + @JsonCreator + private TaskAnnouncement( + @JsonProperty("id") String taskId, + @JsonProperty("status") TaskStatus.Status status, + @JsonProperty("taskStatus") TaskStatus taskStatus, + @JsonProperty("taskResource") TaskResource taskResource + ) + { + if (taskStatus != null) { + this.taskStatus = taskStatus; + } else { + // Can be removed when backwards compat is no longer needed + this.taskStatus = TaskStatus.fromCode(taskId, status); + } + this.taskResource = taskResource == null ? new TaskResource(this.taskStatus.getId(), 1) : taskResource; + } + + // Can be removed when backwards compat is no longer needed + @JsonProperty("id") + @Deprecated + public String getTaskId() + { + return taskStatus.getId(); + } + + // Can be removed when backwards compat is no longer needed + @JsonProperty("status") + @Deprecated + public TaskStatus.Status getStatus() + { + return taskStatus.getStatusCode(); + } + + @JsonProperty("taskStatus") + public TaskStatus getTaskStatus() + { + return taskStatus; + } + + @JsonProperty("taskResource") + public TaskResource getTaskResource() + { + return taskResource; + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java index 89d6c313487..8c597bba6e9 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -180,7 +180,7 @@ public class WorkerCuratorCoordinator } } - public void announceStatus(TaskStatus status) + public void announceTask(TaskAnnouncement announcement) { synchronized (lock) { if (!started) { @@ -188,7 +188,7 @@ public class WorkerCuratorCoordinator } try { - byte[] rawBytes = jsonMapper.writeValueAsBytes(status); + byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); if (rawBytes.length > config.getMaxNumBytes()) { throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); } @@ -196,7 +196,7 @@ public class WorkerCuratorCoordinator curatorFramework.create() .withMode(CreateMode.EPHEMERAL) .forPath( - getStatusPathForId(status.getId()), rawBytes + getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes ); } catch (Exception e) { @@ -205,7 +205,7 @@ public class WorkerCuratorCoordinator } } - public void updateStatus(TaskStatus status) + public void updateAnnouncement(TaskAnnouncement announcement) { synchronized (lock) { if (!started) { @@ -213,18 +213,18 @@ public class WorkerCuratorCoordinator } try { - if (curatorFramework.checkExists().forPath(getStatusPathForId(status.getId())) == null) { - announceStatus(status); + if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) { + announceTask(announcement); return; } - byte[] rawBytes = jsonMapper.writeValueAsBytes(status); + byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); if (rawBytes.length > config.getMaxNumBytes()) { throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); } curatorFramework.setData() .forPath( - getStatusPathForId(status.getId()), rawBytes + getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes ); } catch (Exception e) { @@ -232,4 +232,4 @@ public class WorkerCuratorCoordinator } } } -} \ No newline at end of file +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java index 584ec0a2ef6..ed3e3e993b2 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerTaskMonitor.java @@ -118,7 +118,12 @@ public class WorkerTaskMonitor TaskStatus taskStatus; try { workerCuratorCoordinator.unannounceTask(task.getId()); - workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId())); + workerCuratorCoordinator.announceTask( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()) + ) + ); taskStatus = taskRunner.run(task).get(); } catch (Exception e) { @@ -134,7 +139,7 @@ public class WorkerTaskMonitor taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime); try { - workerCuratorCoordinator.updateStatus(taskStatus); + workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus)); log.info( "Job's finished. Completed [%s] with status [%s]", task.getId(), diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index f73d289526d..07d8346f65e 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -31,6 +31,7 @@ import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.ZkWorker; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; +import com.metamx.druid.indexing.worker.TaskAnnouncement; import com.metamx.druid.indexing.worker.Worker; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.emitter.EmittingLogger; @@ -351,12 +352,12 @@ public class SimpleResourceManagementStrategyTest } @Override - public Map getRunningTasks() + public Map getRunningTasks() { if (testTask == null) { return Maps.newHashMap(); } - return ImmutableMap.of(testTask.getId(), TaskStatus.running(testTask.getId())); + return ImmutableMap.of(testTask.getId(), TaskAnnouncement.create(testTask, TaskStatus.running(testTask.getId()))); } } } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/worker/TaskAnnouncementTest.java new file mode 100644 index 00000000000..5202609f42d --- /dev/null +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/worker/TaskAnnouncementTest.java @@ -0,0 +1,59 @@ +package com.metamx.druid.indexing.worker; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.index.v1.IndexGranularity; +import com.metamx.druid.indexing.common.TaskStatus; +import com.metamx.druid.indexing.common.task.RealtimeIndexTask; +import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.indexing.common.task.TaskResource; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.realtime.Schema; +import com.metamx.druid.shard.NoneShardSpec; +import junit.framework.Assert; +import org.joda.time.Period; +import org.junit.Test; + +public class TaskAnnouncementTest +{ + @Test + public void testBackwardsCompatibleSerde() throws Exception + { + final Task task = new RealtimeIndexTask( + "theid", + new TaskResource("rofl", 2), + new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()), + null, + null, + new Period("PT10M"), + IndexGranularity.HOUR, + null + ); + final TaskStatus status = TaskStatus.running(task.getId()); + final TaskAnnouncement announcement = TaskAnnouncement.create(task, status); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String statusJson = jsonMapper.writeValueAsString(status); + final String announcementJson = jsonMapper.writeValueAsString(announcement); + + final TaskStatus statusFromStatus = jsonMapper.readValue(statusJson, TaskStatus.class); + final TaskStatus statusFromAnnouncement = jsonMapper.readValue(announcementJson, TaskStatus.class); + final TaskAnnouncement announcementFromStatus = jsonMapper.readValue(statusJson, TaskAnnouncement.class); + final TaskAnnouncement announcementFromAnnouncement = jsonMapper.readValue( + announcementJson, + TaskAnnouncement.class + ); + + Assert.assertEquals("theid", statusFromStatus.getId()); + Assert.assertEquals("theid", statusFromAnnouncement.getId()); + Assert.assertEquals("theid", announcementFromStatus.getTaskStatus().getId()); + Assert.assertEquals("theid", announcementFromAnnouncement.getTaskStatus().getId()); + + Assert.assertEquals("theid", announcementFromStatus.getTaskResource().getAvailabilityGroup()); + Assert.assertEquals("rofl", announcementFromAnnouncement.getTaskResource().getAvailabilityGroup()); + + Assert.assertEquals(1, announcementFromStatus.getTaskResource().getRequiredCapacity()); + Assert.assertEquals(2, announcementFromAnnouncement.getTaskResource().getRequiredCapacity()); + } +} From 70ab225770733beff3528173be7774454ef155ea Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 20 Aug 2013 17:50:10 -0700 Subject: [PATCH 3/3] Add missing license headers --- .../indexing/worker/TaskAnnouncement.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java index 1af8d6d47cd..a02093dbae5 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/TaskAnnouncement.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.indexing.worker; import com.fasterxml.jackson.annotation.JsonCreator;