diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java
index 679faaf3191..fb15cd75354 100644
--- a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java
@@ -297,18 +297,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
if (runningTask != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) {
- log.makeAlert("Told to run task that is in the running queue but no worker is actually running it?!")
- .addData("taskId", task.getId())
- .emit();
- runningTasks.remove(task.getId());
+ log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
if (announcement.getTaskStatus().isComplete()) {
taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus());
}
- return runningTask.getResult();
}
+
+ return runningTask.getResult();
}
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
@@ -552,12 +550,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
timeoutStopwatch.start();
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task)) {
- statusLock.wait(config.getTaskAssignmentTimeout().getMillis());
- if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeout().getMillis()) {
+ final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
+ statusLock.wait(waitMs);
+ long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
+ if (elapsed >= waitMs) {
log.error(
- "Something went wrong! %s never ran task %s after %s!",
+ "Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!",
theWorker.getHost(),
task.getId(),
+ elapsed,
config.getTaskAssignmentTimeout()
);
diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java
index f4fe25d8c3b..812eee3d4cb 100644
--- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java
+++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java
@@ -183,7 +183,7 @@ public class WorkerCuratorCoordinator
}
}
- public void announceTask(TaskAnnouncement announcement)
+ public void announceTastAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
@@ -219,7 +219,7 @@ public class WorkerCuratorCoordinator
try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
- announceTask(announcement);
+ announceTastAnnouncement(announcement);
return;
}
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java
index af28f2402a4..c1382c994e6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java
+++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java
@@ -42,7 +42,7 @@ import java.util.concurrent.ExecutorService;
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen.
*
- * The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
+ * The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks.
*/
public class WorkerTaskMonitor
@@ -122,7 +122,7 @@ public class WorkerTaskMonitor
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
- workerCuratorCoordinator.announceTask(
+ workerCuratorCoordinator.announceTastAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/IndexingServiceCondition.java b/indexing-service/src/test/java/io/druid/indexing/common/IndexingServiceCondition.java
new file mode 100644
index 00000000000..6157e69a6ce
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/common/IndexingServiceCondition.java
@@ -0,0 +1,27 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.common;
+
+/**
+ */
+public interface IndexingServiceCondition
+{
+ public boolean isValid();
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/TestTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java
similarity index 66%
rename from indexing-service/src/test/java/io/druid/indexing/TestTask.java
rename to indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java
index f8e13043872..906e6e6c1e6 100644
--- a/indexing-service/src/test/java/io/druid/indexing/TestTask.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java
@@ -17,37 +17,59 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
-package io.druid.indexing;
+package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.TaskToolbox;
+import com.google.common.collect.Lists;
import io.druid.indexing.common.task.MergeTask;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
import java.util.List;
/**
*/
@JsonTypeName("test")
-public class TestTask extends MergeTask
+public class TestMergeTask extends MergeTask
{
- private final TaskStatus status;
+ public static TestMergeTask createDummyTask(String taskId)
+ {
+ return new TestMergeTask(
+ taskId,
+ "dummyDs",
+ Lists.newArrayList(
+ new DataSegment(
+ "dummyDs",
+ new Interval(new DateTime(), new DateTime()),
+ new DateTime().toString(),
+ null,
+ null,
+ null,
+ null,
+ 0,
+ 0
+ )
+ ),
+ Lists.newArrayList()
+ );
+ }
+
+ private final String id;
@JsonCreator
- public TestTask(
+ public TestMergeTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List segments,
- @JsonProperty("aggregations") List aggregators,
- @JsonProperty("taskStatus") TaskStatus status
+ @JsonProperty("aggregations") List aggregators
)
{
super(id, dataSource, segments, aggregators);
- this.status = status;
+ this.id = id;
}
@Override
@@ -57,15 +79,9 @@ public class TestTask extends MergeTask
return "test";
}
- @JsonProperty
- public TaskStatus getStatus()
- {
- return status;
- }
-
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
- return status;
+ return TaskStatus.running(id);
}
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
similarity index 92%
rename from indexing-service/src/test/java/io/druid/indexing/coordinator/TestRealtimeTask.java
rename to indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
index 22f36bc5098..cc69067d23c 100644
--- a/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRealtimeTask.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
@@ -17,14 +17,12 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
-package io.druid.indexing.coordinator;
+package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.granularity.QueryGranularity;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.query.aggregation.AggregatorFactory;
@@ -34,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec;
/**
*/
@JsonTypeName("test_realtime")
-public class TestRealtimeTask extends RealtimeIndexTask
+public class TestRealtimeTask extends RealtimeIndexTask implements TestTask
{
private final TaskStatus status;
@@ -66,6 +64,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
return "test_realtime";
}
+ @Override
@JsonProperty
public TaskStatus getStatus()
{
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestTask.java
new file mode 100644
index 00000000000..fa0e9072c84
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestTask.java
@@ -0,0 +1,30 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.common;
+
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.task.Task;
+
+/**
+ */
+public interface TestTask extends Task
+{
+ public TaskStatus getStatus();
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
new file mode 100644
index 00000000000..2ec6c474e6c
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.common;
+
+import com.google.common.base.Stopwatch;
+import com.metamx.common.ISE;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class TestUtils
+{
+ public static boolean conditionValid(IndexingServiceCondition condition)
+ {
+ try {
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.start();
+ while (!condition.isValid()) {
+ Thread.sleep(100);
+ if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
+ throw new ISE("Cannot find running task");
+ }
+ }
+ }
+ catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java
index 47bfce6157b..e9eebfaba67 100644
--- a/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java
@@ -20,62 +20,46 @@
package io.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
-import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
-import io.druid.indexing.TestTask;
+import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.TaskToolboxFactory;
-import io.druid.indexing.common.config.TaskConfig;
+import io.druid.indexing.common.TestMergeTask;
+import io.druid.indexing.common.TestRealtimeTask;
+import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
-import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import io.druid.indexing.coordinator.setup.WorkerSetupData;
+import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
-import io.druid.indexing.worker.WorkerCuratorCoordinator;
-import io.druid.indexing.worker.WorkerTaskMonitor;
-import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper;
-import io.druid.query.aggregation.AggregatorFactory;
import io.druid.server.initialization.ZkPathsConfig;
-import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-/**
- * Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
- * class as well as integration tests in the very near future.
- */
public class RemoteTaskRunnerTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@@ -88,10 +72,8 @@ public class RemoteTaskRunnerTest
private TestingCluster testingCluster;
private CuratorFramework cf;
private RemoteTaskRunner remoteTaskRunner;
- private WorkerCuratorCoordinator workerCuratorCoordinator;
- private WorkerTaskMonitor workerTaskMonitor;
- private TestTask task;
+ private TestMergeTask task;
private Worker worker;
@@ -108,26 +90,77 @@ public class RemoteTaskRunnerTest
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
+ cf.create().creatingParentsIfNeeded().forPath(tasksPath);
+ cf.create().creatingParentsIfNeeded().forPath(statusPath);
- task = makeTask(TaskStatus.success("task"));
+
+ task = TestMergeTask.createDummyTask("task");
}
@After
public void tearDown() throws Exception
{
remoteTaskRunner.stop();
- workerCuratorCoordinator.stop();
- workerTaskMonitor.stop();
cf.close();
testingCluster.stop();
}
@Test
- public void testRunNoExistingTask() throws Exception
+ public void testRun() throws Exception
+ {
+ doSetup();
+
+ ListenableFuture result = remoteTaskRunner.run(task);
+
+ Assert.assertTrue(taskAnnounced(task.getId()));
+ mockWorkerRunningTask(task);
+ Assert.assertTrue(workerRunningTask(task.getId()));
+ mockWorkerCompleteSuccessfulTask(task);
+ Assert.assertTrue(workerCompletedTask(result));
+
+ Assert.assertEquals(task.getId(), result.get().getId());
+ Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
+ }
+
+ @Test
+ public void testRunExistingTaskThatHasntStartedRunning() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
+ Assert.assertTrue(taskAnnounced(task.getId()));
+
+ ListenableFuture result = remoteTaskRunner.run(task);
+
+ Assert.assertFalse(result.isDone());
+ mockWorkerRunningTask(task);
+ Assert.assertTrue(workerRunningTask(task.getId()));
+ mockWorkerCompleteSuccessfulTask(task);
+ Assert.assertTrue(workerCompletedTask(result));
+
+ Assert.assertEquals(task.getId(), result.get().getId());
+ Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
+ }
+
+ @Test
+ public void testRunExistingTaskThatHasStartedRunning() throws Exception
+ {
+ doSetup();
+
+ remoteTaskRunner.run(task);
+ Assert.assertTrue(taskAnnounced(task.getId()));
+ mockWorkerRunningTask(task);
+ Assert.assertTrue(workerRunningTask(task.getId()));
+
+ ListenableFuture result = remoteTaskRunner.run(task);
+
+ Assert.assertFalse(result.isDone());
+
+ mockWorkerCompleteSuccessfulTask(task);
+ Assert.assertTrue(workerCompletedTask(result));
+
+ Assert.assertEquals(task.getId(), result.get().getId());
+ Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
}
@Test
@@ -139,7 +172,7 @@ public class RemoteTaskRunnerTest
doSetup();
- remoteTaskRunner.run(makeTask(TaskStatus.success(new String(new char[5000]))));
+ remoteTaskRunner.run(TestMergeTask.createDummyTask(new String(new char[5000])));
EasyMock.verify(emitter);
}
@@ -149,31 +182,43 @@ public class RemoteTaskRunnerTest
{
doSetup();
- TestRealtimeTask theTask = new TestRealtimeTask(
- "rt1",
- new TaskResource("rt1", 1),
- "foo",
- TaskStatus.running("rt1")
- );
- remoteTaskRunner.run(theTask);
- remoteTaskRunner.run(
- new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"))
- );
- remoteTaskRunner.run(
- new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"))
+ TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
+ remoteTaskRunner.run(task1);
+ Assert.assertTrue(taskAnnounced(task1.getId()));
+ mockWorkerRunningTask(task1);
+
+ TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"));
+ remoteTaskRunner.run(task2);
+
+ TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"));
+ remoteTaskRunner.run(task3);
+
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ return remoteTaskRunner.getRunningTasks().size() == 2;
+ }
+ }
+ )
);
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
- while (remoteTaskRunner.getRunningTasks().size() < 2) {
- Thread.sleep(100);
- if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
- throw new ISE("Cannot find running task");
- }
- }
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ return remoteTaskRunner.getPendingTasks().size() == 1;
+ }
+ }
+ )
+ );
- Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
- Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
@@ -182,53 +227,62 @@ public class RemoteTaskRunnerTest
{
doSetup();
- TestRealtimeTask theTask = new TestRealtimeTask(
- "rt1",
- new TaskResource("rt1", 1),
- "foo",
- TaskStatus.running("rt1")
- );
- remoteTaskRunner.run(theTask);
- remoteTaskRunner.run(
- new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"))
- );
- remoteTaskRunner.run(
- new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"))
+ TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
+ remoteTaskRunner.run(task1);
+ Assert.assertTrue(taskAnnounced(task1.getId()));
+ mockWorkerRunningTask(task1);
+
+ TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"));
+ remoteTaskRunner.run(task2);
+
+ TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"));
+ remoteTaskRunner.run(task3);
+ Assert.assertTrue(taskAnnounced(task3.getId()));
+ mockWorkerRunningTask(task3);
+
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ return remoteTaskRunner.getRunningTasks().size() == 2;
+ }
+ }
+ )
);
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
- while (remoteTaskRunner.getRunningTasks().size() < 2) {
- Thread.sleep(100);
- if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
- throw new ISE("Cannot find running task");
- }
- }
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ return remoteTaskRunner.getPendingTasks().size() == 1;
+ }
+ }
+ )
+ );
- Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
- Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
@Test
- public void testFailure() throws Exception
+ public void testStatusRemoved() throws Exception
{
doSetup();
- ListenableFuture future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
- final String taskStatus = joiner.join(statusPath, "task");
+ ListenableFuture future = remoteTaskRunner.run(task);
+ Assert.assertTrue(taskAnnounced(task.getId()));
+ mockWorkerRunningTask(task);
+
+ Assert.assertTrue(workerRunningTask(task.getId()));
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
- while (cf.checkExists().forPath(taskStatus) == null) {
- Thread.sleep(100);
- if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
- throw new ISE("Cannot find running task");
- }
- }
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task"));
- cf.delete().forPath(taskStatus);
+ cf.delete().forPath(joiner.join(statusPath, task.getId()));
TaskStatus status = future.get();
@@ -258,7 +312,7 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(existingTasks.contains("first"));
Assert.assertTrue(existingTasks.contains("second"));
- remoteTaskRunner.bootstrap(Arrays.asList(makeTask(TaskStatus.running("second"))));
+ remoteTaskRunner.bootstrap(Arrays.asList(TestMergeTask.createDummyTask("second")));
Set runningTasks = Sets.newHashSet(
Iterables.transform(
@@ -303,18 +357,14 @@ public class RemoteTaskRunnerTest
{
doSetup();
remoteTaskRunner.bootstrap(Lists.newArrayList());
- Future future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
+ Future future = remoteTaskRunner.run(task);
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
- while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
- Thread.sleep(100);
- if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
- throw new ISE("Cannot find running task");
- }
- }
+ Assert.assertTrue(taskAnnounced(task.getId()));
+ mockWorkerRunningTask(task);
- workerCuratorCoordinator.stop();
+ Assert.assertTrue(workerRunningTask(task.getId()));
+
+ cf.delete().forPath(announcementsPath);
TaskStatus status = future.get();
@@ -325,68 +375,6 @@ public class RemoteTaskRunnerTest
{
makeWorker();
makeRemoteTaskRunner();
- makeTaskMonitor();
- }
-
- private TestTask makeTask(TaskStatus status)
- {
- return new TestTask(
- status.getId(),
- "dummyDs",
- Lists.newArrayList(
- new DataSegment(
- "dummyDs",
- new Interval(new DateTime(), new DateTime()),
- new DateTime().toString(),
- null,
- null,
- null,
- null,
- 0,
- 0
- )
- ),
- Lists.newArrayList(),
- status
- );
- }
-
- private void makeTaskMonitor() throws Exception
- {
- workerCuratorCoordinator = new WorkerCuratorCoordinator(
- jsonMapper,
- new ZkPathsConfig()
- {
- @Override
- public String getZkBasePath()
- {
- return basePath;
- }
- },
- new TestRemoteTaskRunnerConfig(),
- cf,
- worker
- );
- workerCuratorCoordinator.start();
-
- final File tmp = Files.createTempDir();
-
- // Start a task monitor
- workerTaskMonitor = new WorkerTaskMonitor(
- jsonMapper,
- cf,
- workerCuratorCoordinator,
- new ThreadPoolTaskRunner(
- new TaskToolboxFactory(
- new TaskConfig(tmp.toString(), null, null, 0),
- null, null, null, null, null, null, null, null, null, jsonMapper
- )
- ),
- new WorkerConfig().setCapacity(1)
- );
- jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test"));
- jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
- workerTaskMonitor.start();
}
private void makeRemoteTaskRunner() throws Exception
@@ -426,30 +414,60 @@ public class RemoteTaskRunnerTest
);
}
- private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
+ private boolean taskAnnounced(final String taskId)
{
- @Override
- public boolean isCompressZnodes()
- {
- return false;
- }
+ return pathExists(joiner.join(tasksPath, taskId));
+ }
- @Override
- public Period getTaskAssignmentTimeout()
- {
- return new Period(60000);
- }
+ private boolean workerRunningTask(final String taskId)
+ {
+ return pathExists(joiner.join(statusPath, taskId));
+ }
- @Override
- public long getMaxZnodeBytes()
- {
- return 1000;
- }
+ private boolean pathExists(final String path)
+ {
+ return TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ try {
+ return cf.checkExists().forPath(path) != null;
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+ );
+ }
- @Override
- public String getWorkerVersion()
- {
- return "";
- }
+ private boolean workerCompletedTask(final ListenableFuture result)
+ {
+ return TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ return result.isDone();
+ }
+ }
+ );
+ }
+
+ private void mockWorkerRunningTask(final Task task) throws Exception
+ {
+ cf.delete().forPath(joiner.join(tasksPath, task.getId()));
+
+ TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()));
+ cf.create().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
+ }
+
+ private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception
+ {
+ TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()));
+ cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRemoteTaskRunnerConfig.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRemoteTaskRunnerConfig.java
new file mode 100644
index 00000000000..c006535251a
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRemoteTaskRunnerConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.coordinator;
+
+import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
+import org.joda.time.Period;
+
+/**
+ */
+public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
+{
+ @Override
+ public boolean isCompressZnodes()
+ {
+ return false;
+ }
+
+ @Override
+ public Period getTaskAssignmentTimeout()
+ {
+ return new Period("PT1S");
+ }
+
+ @Override
+ public long getMaxZnodeBytes()
+ {
+ return 1000;
+ }
+
+ @Override
+ public String getWorkerVersion()
+ {
+ return "";
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java
index cbfcc484f9e..03fd983437c 100644
--- a/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java
@@ -26,7 +26,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers;
-import io.druid.indexing.TestTask;
+import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
@@ -69,7 +69,7 @@ public class SimpleResourceManagementStrategyTest
)
);
- testTask = new TestTask(
+ testTask = new TestMergeTask(
"task1",
"dummyDs",
Lists.newArrayList(
@@ -85,8 +85,7 @@ public class SimpleResourceManagementStrategyTest
0
)
),
- Lists.newArrayList(),
- TaskStatus.success("task1")
+ Lists.newArrayList()
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
autoScalingStrategy,
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
new file mode 100644
index 00000000000..399ff4ab05a
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -0,0 +1,189 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import io.druid.curator.PotentiallyGzippedCompressionProvider;
+import io.druid.indexing.common.IndexingServiceCondition;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolboxFactory;
+import io.druid.indexing.common.TestMergeTask;
+import io.druid.indexing.common.TestRealtimeTask;
+import io.druid.indexing.common.TestUtils;
+import io.druid.indexing.common.config.TaskConfig;
+import io.druid.indexing.coordinator.TestRemoteTaskRunnerConfig;
+import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
+import io.druid.indexing.worker.config.WorkerConfig;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.server.initialization.ZkPathsConfig;
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+/**
+ */
+public class WorkerTaskMonitorTest
+{
+ private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
+ private static final Joiner joiner = Joiner.on("/");
+ private static final String basePath = "/test/druid";
+ private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
+ private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
+
+ private TestingCluster testingCluster;
+ private CuratorFramework cf;
+ private WorkerCuratorCoordinator workerCuratorCoordinator;
+ private WorkerTaskMonitor workerTaskMonitor;
+
+ private TestMergeTask task;
+
+ private Worker worker;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ testingCluster = new TestingCluster(1);
+ testingCluster.start();
+
+ cf = CuratorFrameworkFactory.builder()
+ .connectString(testingCluster.getConnectString())
+ .retryPolicy(new ExponentialBackoffRetry(1, 10))
+ .compressionProvider(new PotentiallyGzippedCompressionProvider(false))
+ .build();
+ cf.start();
+ cf.create().creatingParentsIfNeeded().forPath(basePath);
+ //cf.create().creatingParentsIfNeeded().forPath(tasksPath);
+ //cf.create().creatingParentsIfNeeded().forPath(statusPath);
+
+ worker = new Worker(
+ "worker",
+ "localhost",
+ 3,
+ "0"
+ );
+
+ workerCuratorCoordinator = new WorkerCuratorCoordinator(
+ jsonMapper,
+ new ZkPathsConfig()
+ {
+ @Override
+ public String getZkBasePath()
+ {
+ return basePath;
+ }
+ },
+ new TestRemoteTaskRunnerConfig(),
+ cf,
+ worker
+ );
+ workerCuratorCoordinator.start();
+
+ final File tmp = Files.createTempDir();
+
+ // Start a task monitor
+ workerTaskMonitor = new WorkerTaskMonitor(
+ jsonMapper,
+ cf,
+ workerCuratorCoordinator,
+ new ThreadPoolTaskRunner(
+ new TaskToolboxFactory(
+ new TaskConfig(tmp.toString(), null, null, 0),
+ null, null, null, null, null, null, null, null, null, jsonMapper
+ )
+ ),
+ new WorkerConfig().setCapacity(1)
+ );
+ jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test"));
+ jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
+ workerTaskMonitor.start();
+
+ task = TestMergeTask.createDummyTask("test");
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ workerTaskMonitor.stop();
+ cf.close();
+ testingCluster.stop();
+ }
+
+ @Test
+ public void testRunTask() throws Exception
+ {
+ cf.create()
+ .creatingParentsIfNeeded()
+ .forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
+
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ try {
+ return cf.checkExists().forPath(joiner.join(tasksPath, task.getId())) == null;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+ }
+ )
+ );
+
+
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ new IndexingServiceCondition()
+ {
+ @Override
+ public boolean isValid()
+ {
+ try {
+ return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+ }
+ )
+ );
+
+ TaskAnnouncement taskAnnouncement = jsonMapper.readValue(
+ cf.getData().forPath(joiner.join(statusPath, task.getId())), TaskAnnouncement.class
+ );
+
+ Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId());
+ Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode());
+ }
+}
\ No newline at end of file