From f7c10e359400a1ac572209e50963f3675021b857 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 12 Sep 2013 16:37:58 -0700 Subject: [PATCH] rework tests in indexing service to be more unit testy --- .../coordinator/RemoteTaskRunner.java | 17 +- .../worker/WorkerCuratorCoordinator.java | 4 +- .../indexing/worker/WorkerTaskMonitor.java | 4 +- .../common/IndexingServiceCondition.java | 27 ++ .../TestMergeTask.java} | 48 ++- .../TestRealtimeTask.java | 7 +- .../io/druid/indexing/common/TestTask.java | 30 ++ .../io/druid/indexing/common/TestUtils.java | 48 +++ .../coordinator/RemoteTaskRunnerTest.java | 378 +++++++++--------- .../TestRemoteTaskRunnerConfig.java | 52 +++ .../SimpleResourceManagementStrategyTest.java | 7 +- .../worker/WorkerTaskMonitorTest.java | 189 +++++++++ 12 files changed, 595 insertions(+), 216 deletions(-) create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/IndexingServiceCondition.java rename indexing-service/src/test/java/io/druid/indexing/{TestTask.java => common/TestMergeTask.java} (66%) rename indexing-service/src/test/java/io/druid/indexing/{coordinator => common}/TestRealtimeTask.java (92%) create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/TestTask.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/coordinator/TestRemoteTaskRunnerConfig.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java index 679faaf3191..fb15cd75354 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java @@ -297,18 +297,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer if (runningTask != null) { ZkWorker zkWorker = findWorkerRunningTask(task.getId()); if (zkWorker == null) { - log.makeAlert("Told to run task that is in the running queue but no worker is actually running it?!") - .addData("taskId", task.getId()) - .emit(); - runningTasks.remove(task.getId()); + log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId()); } else { log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost()); TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId()); if (announcement.getTaskStatus().isComplete()) { taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus()); } - return runningTask.getResult(); } + + return runningTask.getResult(); } RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId()); @@ -552,12 +550,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer timeoutStopwatch.start(); synchronized (statusLock) { while (!isWorkerRunningTask(theWorker, task)) { - statusLock.wait(config.getTaskAssignmentTimeout().getMillis()); - if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeout().getMillis()) { + final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); + statusLock.wait(waitMs); + long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS); + if (elapsed >= waitMs) { log.error( - "Something went wrong! %s never ran task %s after %s!", + "Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!", theWorker.getHost(), task.getId(), + elapsed, config.getTaskAssignmentTimeout() ); diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index f4fe25d8c3b..812eee3d4cb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -183,7 +183,7 @@ public class WorkerCuratorCoordinator } } - public void announceTask(TaskAnnouncement announcement) + public void announceTastAnnouncement(TaskAnnouncement announcement) { synchronized (lock) { if (!started) { @@ -219,7 +219,7 @@ public class WorkerCuratorCoordinator try { if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) { - announceTask(announcement); + announceTastAnnouncement(announcement); return; } byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java index af28f2402a4..c1382c994e6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java @@ -42,7 +42,7 @@ import java.util.concurrent.ExecutorService; * The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be * created that waits for new tasks. Tasks are executed as soon as they are seen. *

- * The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for + * The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for * realtime index tasks. */ public class WorkerTaskMonitor @@ -122,7 +122,7 @@ public class WorkerTaskMonitor TaskStatus taskStatus; try { workerCuratorCoordinator.unannounceTask(task.getId()); - workerCuratorCoordinator.announceTask( + workerCuratorCoordinator.announceTastAnnouncement( TaskAnnouncement.create( task, TaskStatus.running(task.getId()) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/IndexingServiceCondition.java b/indexing-service/src/test/java/io/druid/indexing/common/IndexingServiceCondition.java new file mode 100644 index 00000000000..6157e69a6ce --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/IndexingServiceCondition.java @@ -0,0 +1,27 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common; + +/** + */ +public interface IndexingServiceCondition +{ + public boolean isValid(); +} diff --git a/indexing-service/src/test/java/io/druid/indexing/TestTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java similarity index 66% rename from indexing-service/src/test/java/io/druid/indexing/TestTask.java rename to indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java index f8e13043872..906e6e6c1e6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/TestTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java @@ -17,37 +17,59 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing; +package io.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskToolbox; +import com.google.common.collect.Lists; import io.druid.indexing.common.task.MergeTask; import io.druid.query.aggregation.AggregatorFactory; import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; import java.util.List; /** */ @JsonTypeName("test") -public class TestTask extends MergeTask +public class TestMergeTask extends MergeTask { - private final TaskStatus status; + public static TestMergeTask createDummyTask(String taskId) + { + return new TestMergeTask( + taskId, + "dummyDs", + Lists.newArrayList( + new DataSegment( + "dummyDs", + new Interval(new DateTime(), new DateTime()), + new DateTime().toString(), + null, + null, + null, + null, + 0, + 0 + ) + ), + Lists.newArrayList() + ); + } + + private final String id; @JsonCreator - public TestTask( + public TestMergeTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, - @JsonProperty("aggregations") List aggregators, - @JsonProperty("taskStatus") TaskStatus status + @JsonProperty("aggregations") List aggregators ) { super(id, dataSource, segments, aggregators); - this.status = status; + this.id = id; } @Override @@ -57,15 +79,9 @@ public class TestTask extends MergeTask return "test"; } - @JsonProperty - public TaskStatus getStatus() - { - return status; - } - @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - return status; + return TaskStatus.running(id); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java similarity index 92% rename from indexing-service/src/test/java/io/druid/indexing/coordinator/TestRealtimeTask.java rename to indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 22f36bc5098..cc69067d23c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -17,14 +17,12 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.coordinator; +package io.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import io.druid.granularity.QueryGranularity; -import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.TaskResource; import io.druid.query.aggregation.AggregatorFactory; @@ -34,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec; /** */ @JsonTypeName("test_realtime") -public class TestRealtimeTask extends RealtimeIndexTask +public class TestRealtimeTask extends RealtimeIndexTask implements TestTask { private final TaskStatus status; @@ -66,6 +64,7 @@ public class TestRealtimeTask extends RealtimeIndexTask return "test_realtime"; } + @Override @JsonProperty public TaskStatus getStatus() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestTask.java new file mode 100644 index 00000000000..fa0e9072c84 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestTask.java @@ -0,0 +1,30 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common; + +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.task.Task; + +/** + */ +public interface TestTask extends Task +{ + public TaskStatus getStatus(); +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java new file mode 100644 index 00000000000..2ec6c474e6c --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common; + +import com.google.common.base.Stopwatch; +import com.metamx.common.ISE; + +import java.util.concurrent.TimeUnit; + +/** + */ +public class TestUtils +{ + public static boolean conditionValid(IndexingServiceCondition condition) + { + try { + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + while (!condition.isValid()) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Cannot find running task"); + } + } + } + catch (Exception e) { + return false; + } + return true; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 47bfce6157b..e9eebfaba67 100644 --- a/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -20,62 +20,46 @@ package io.druid.indexing.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.io.Files; import com.google.common.util.concurrent.ListenableFuture; -import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.common.guava.DSuppliers; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.SimplePathChildrenCacheFactory; -import io.druid.indexing.TestTask; +import io.druid.indexing.common.IndexingServiceCondition; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskToolboxFactory; -import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.TestMergeTask; +import io.druid.indexing.common.TestRealtimeTask; +import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; -import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import io.druid.indexing.coordinator.setup.WorkerSetupData; +import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; -import io.druid.indexing.worker.WorkerCuratorCoordinator; -import io.druid.indexing.worker.WorkerTaskMonitor; -import io.druid.indexing.worker.config.WorkerConfig; import io.druid.jackson.DefaultObjectMapper; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.File; import java.util.Arrays; import java.util.Set; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -/** - * Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this - * class as well as integration tests in the very near future. - */ public class RemoteTaskRunnerTest { private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); @@ -88,10 +72,8 @@ public class RemoteTaskRunnerTest private TestingCluster testingCluster; private CuratorFramework cf; private RemoteTaskRunner remoteTaskRunner; - private WorkerCuratorCoordinator workerCuratorCoordinator; - private WorkerTaskMonitor workerTaskMonitor; - private TestTask task; + private TestMergeTask task; private Worker worker; @@ -108,26 +90,77 @@ public class RemoteTaskRunnerTest .build(); cf.start(); cf.create().creatingParentsIfNeeded().forPath(basePath); + cf.create().creatingParentsIfNeeded().forPath(tasksPath); + cf.create().creatingParentsIfNeeded().forPath(statusPath); - task = makeTask(TaskStatus.success("task")); + + task = TestMergeTask.createDummyTask("task"); } @After public void tearDown() throws Exception { remoteTaskRunner.stop(); - workerCuratorCoordinator.stop(); - workerTaskMonitor.stop(); cf.close(); testingCluster.stop(); } @Test - public void testRunNoExistingTask() throws Exception + public void testRun() throws Exception + { + doSetup(); + + ListenableFuture result = remoteTaskRunner.run(task); + + Assert.assertTrue(taskAnnounced(task.getId())); + mockWorkerRunningTask(task); + Assert.assertTrue(workerRunningTask(task.getId())); + mockWorkerCompleteSuccessfulTask(task); + Assert.assertTrue(workerCompletedTask(result)); + + Assert.assertEquals(task.getId(), result.get().getId()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode()); + } + + @Test + public void testRunExistingTaskThatHasntStartedRunning() throws Exception { doSetup(); remoteTaskRunner.run(task); + Assert.assertTrue(taskAnnounced(task.getId())); + + ListenableFuture result = remoteTaskRunner.run(task); + + Assert.assertFalse(result.isDone()); + mockWorkerRunningTask(task); + Assert.assertTrue(workerRunningTask(task.getId())); + mockWorkerCompleteSuccessfulTask(task); + Assert.assertTrue(workerCompletedTask(result)); + + Assert.assertEquals(task.getId(), result.get().getId()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode()); + } + + @Test + public void testRunExistingTaskThatHasStartedRunning() throws Exception + { + doSetup(); + + remoteTaskRunner.run(task); + Assert.assertTrue(taskAnnounced(task.getId())); + mockWorkerRunningTask(task); + Assert.assertTrue(workerRunningTask(task.getId())); + + ListenableFuture result = remoteTaskRunner.run(task); + + Assert.assertFalse(result.isDone()); + + mockWorkerCompleteSuccessfulTask(task); + Assert.assertTrue(workerCompletedTask(result)); + + Assert.assertEquals(task.getId(), result.get().getId()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode()); } @Test @@ -139,7 +172,7 @@ public class RemoteTaskRunnerTest doSetup(); - remoteTaskRunner.run(makeTask(TaskStatus.success(new String(new char[5000])))); + remoteTaskRunner.run(TestMergeTask.createDummyTask(new String(new char[5000]))); EasyMock.verify(emitter); } @@ -149,31 +182,43 @@ public class RemoteTaskRunnerTest { doSetup(); - TestRealtimeTask theTask = new TestRealtimeTask( - "rt1", - new TaskResource("rt1", 1), - "foo", - TaskStatus.running("rt1") - ); - remoteTaskRunner.run(theTask); - remoteTaskRunner.run( - new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2")) - ); - remoteTaskRunner.run( - new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3")) + TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1")); + remoteTaskRunner.run(task1); + Assert.assertTrue(taskAnnounced(task1.getId())); + mockWorkerRunningTask(task1); + + TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2")); + remoteTaskRunner.run(task2); + + TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3")); + remoteTaskRunner.run(task3); + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getRunningTasks().size() == 2; + } + } + ) ); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - while (remoteTaskRunner.getRunningTasks().size() < 2) { - Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { - throw new ISE("Cannot find running task"); - } - } + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getPendingTasks().size() == 1; + } + } + ) + ); - Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2); - Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1); Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); } @@ -182,53 +227,62 @@ public class RemoteTaskRunnerTest { doSetup(); - TestRealtimeTask theTask = new TestRealtimeTask( - "rt1", - new TaskResource("rt1", 1), - "foo", - TaskStatus.running("rt1") - ); - remoteTaskRunner.run(theTask); - remoteTaskRunner.run( - new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2")) - ); - remoteTaskRunner.run( - new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3")) + TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1")); + remoteTaskRunner.run(task1); + Assert.assertTrue(taskAnnounced(task1.getId())); + mockWorkerRunningTask(task1); + + TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2")); + remoteTaskRunner.run(task2); + + TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3")); + remoteTaskRunner.run(task3); + Assert.assertTrue(taskAnnounced(task3.getId())); + mockWorkerRunningTask(task3); + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getRunningTasks().size() == 2; + } + } + ) ); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - while (remoteTaskRunner.getRunningTasks().size() < 2) { - Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { - throw new ISE("Cannot find running task"); - } - } + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getPendingTasks().size() == 1; + } + } + ) + ); - Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2); - Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1); Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); } @Test - public void testFailure() throws Exception + public void testStatusRemoved() throws Exception { doSetup(); - ListenableFuture future = remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); - final String taskStatus = joiner.join(statusPath, "task"); + ListenableFuture future = remoteTaskRunner.run(task); + Assert.assertTrue(taskAnnounced(task.getId())); + mockWorkerRunningTask(task); + + Assert.assertTrue(workerRunningTask(task.getId())); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - while (cf.checkExists().forPath(taskStatus) == null) { - Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { - throw new ISE("Cannot find running task"); - } - } Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task")); - cf.delete().forPath(taskStatus); + cf.delete().forPath(joiner.join(statusPath, task.getId())); TaskStatus status = future.get(); @@ -258,7 +312,7 @@ public class RemoteTaskRunnerTest Assert.assertTrue(existingTasks.contains("first")); Assert.assertTrue(existingTasks.contains("second")); - remoteTaskRunner.bootstrap(Arrays.asList(makeTask(TaskStatus.running("second")))); + remoteTaskRunner.bootstrap(Arrays.asList(TestMergeTask.createDummyTask("second"))); Set runningTasks = Sets.newHashSet( Iterables.transform( @@ -303,18 +357,14 @@ public class RemoteTaskRunnerTest { doSetup(); remoteTaskRunner.bootstrap(Lists.newArrayList()); - Future future = remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); + Future future = remoteTaskRunner.run(task); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) { - Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { - throw new ISE("Cannot find running task"); - } - } + Assert.assertTrue(taskAnnounced(task.getId())); + mockWorkerRunningTask(task); - workerCuratorCoordinator.stop(); + Assert.assertTrue(workerRunningTask(task.getId())); + + cf.delete().forPath(announcementsPath); TaskStatus status = future.get(); @@ -325,68 +375,6 @@ public class RemoteTaskRunnerTest { makeWorker(); makeRemoteTaskRunner(); - makeTaskMonitor(); - } - - private TestTask makeTask(TaskStatus status) - { - return new TestTask( - status.getId(), - "dummyDs", - Lists.newArrayList( - new DataSegment( - "dummyDs", - new Interval(new DateTime(), new DateTime()), - new DateTime().toString(), - null, - null, - null, - null, - 0, - 0 - ) - ), - Lists.newArrayList(), - status - ); - } - - private void makeTaskMonitor() throws Exception - { - workerCuratorCoordinator = new WorkerCuratorCoordinator( - jsonMapper, - new ZkPathsConfig() - { - @Override - public String getZkBasePath() - { - return basePath; - } - }, - new TestRemoteTaskRunnerConfig(), - cf, - worker - ); - workerCuratorCoordinator.start(); - - final File tmp = Files.createTempDir(); - - // Start a task monitor - workerTaskMonitor = new WorkerTaskMonitor( - jsonMapper, - cf, - workerCuratorCoordinator, - new ThreadPoolTaskRunner( - new TaskToolboxFactory( - new TaskConfig(tmp.toString(), null, null, 0), - null, null, null, null, null, null, null, null, null, jsonMapper - ) - ), - new WorkerConfig().setCapacity(1) - ); - jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test")); - jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime")); - workerTaskMonitor.start(); } private void makeRemoteTaskRunner() throws Exception @@ -426,30 +414,60 @@ public class RemoteTaskRunnerTest ); } - private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig + private boolean taskAnnounced(final String taskId) { - @Override - public boolean isCompressZnodes() - { - return false; - } + return pathExists(joiner.join(tasksPath, taskId)); + } - @Override - public Period getTaskAssignmentTimeout() - { - return new Period(60000); - } + private boolean workerRunningTask(final String taskId) + { + return pathExists(joiner.join(statusPath, taskId)); + } - @Override - public long getMaxZnodeBytes() - { - return 1000; - } + private boolean pathExists(final String path) + { + return TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + try { + return cf.checkExists().forPath(path) != null; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } - @Override - public String getWorkerVersion() - { - return ""; - } + private boolean workerCompletedTask(final ListenableFuture result) + { + return TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return result.isDone(); + } + } + ); + } + + private void mockWorkerRunningTask(final Task task) throws Exception + { + cf.delete().forPath(joiner.join(tasksPath, task.getId())); + + TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId())); + cf.create().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); + } + + private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception + { + TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId())); + cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRemoteTaskRunnerConfig.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRemoteTaskRunnerConfig.java new file mode 100644 index 00000000000..c006535251a --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/TestRemoteTaskRunnerConfig.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.coordinator; + +import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; +import org.joda.time.Period; + +/** + */ +public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig +{ + @Override + public boolean isCompressZnodes() + { + return false; + } + + @Override + public Period getTaskAssignmentTimeout() + { + return new Period("PT1S"); + } + + @Override + public long getMaxZnodeBytes() + { + return 1000; + } + + @Override + public String getWorkerVersion() + { + return ""; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index cbfcc484f9e..03fd983437c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -26,7 +26,7 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; import io.druid.common.guava.DSuppliers; -import io.druid.indexing.TestTask; +import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.coordinator.RemoteTaskRunnerWorkItem; @@ -69,7 +69,7 @@ public class SimpleResourceManagementStrategyTest ) ); - testTask = new TestTask( + testTask = new TestMergeTask( "task1", "dummyDs", Lists.newArrayList( @@ -85,8 +85,7 @@ public class SimpleResourceManagementStrategyTest 0 ) ), - Lists.newArrayList(), - TaskStatus.success("task1") + Lists.newArrayList() ); simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( autoScalingStrategy, diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java new file mode 100644 index 00000000000..399ff4ab05a --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -0,0 +1,189 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.worker; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.base.Joiner; +import com.google.common.io.Files; +import io.druid.curator.PotentiallyGzippedCompressionProvider; +import io.druid.indexing.common.IndexingServiceCondition; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.TestMergeTask; +import io.druid.indexing.common.TestRealtimeTask; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.coordinator.TestRemoteTaskRunnerConfig; +import io.druid.indexing.coordinator.ThreadPoolTaskRunner; +import io.druid.indexing.worker.config.WorkerConfig; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.initialization.ZkPathsConfig; +import junit.framework.Assert; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +/** + */ +public class WorkerTaskMonitorTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final Joiner joiner = Joiner.on("/"); + private static final String basePath = "/test/druid"; + private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath); + private static final String statusPath = String.format("%s/indexer/status/worker", basePath); + + private TestingCluster testingCluster; + private CuratorFramework cf; + private WorkerCuratorCoordinator workerCuratorCoordinator; + private WorkerTaskMonitor workerTaskMonitor; + + private TestMergeTask task; + + private Worker worker; + + @Before + public void setUp() throws Exception + { + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) + .build(); + cf.start(); + cf.create().creatingParentsIfNeeded().forPath(basePath); + //cf.create().creatingParentsIfNeeded().forPath(tasksPath); + //cf.create().creatingParentsIfNeeded().forPath(statusPath); + + worker = new Worker( + "worker", + "localhost", + 3, + "0" + ); + + workerCuratorCoordinator = new WorkerCuratorCoordinator( + jsonMapper, + new ZkPathsConfig() + { + @Override + public String getZkBasePath() + { + return basePath; + } + }, + new TestRemoteTaskRunnerConfig(), + cf, + worker + ); + workerCuratorCoordinator.start(); + + final File tmp = Files.createTempDir(); + + // Start a task monitor + workerTaskMonitor = new WorkerTaskMonitor( + jsonMapper, + cf, + workerCuratorCoordinator, + new ThreadPoolTaskRunner( + new TaskToolboxFactory( + new TaskConfig(tmp.toString(), null, null, 0), + null, null, null, null, null, null, null, null, null, jsonMapper + ) + ), + new WorkerConfig().setCapacity(1) + ); + jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test")); + jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime")); + workerTaskMonitor.start(); + + task = TestMergeTask.createDummyTask("test"); + } + + @After + public void tearDown() throws Exception + { + workerTaskMonitor.stop(); + cf.close(); + testingCluster.stop(); + } + + @Test + public void testRunTask() throws Exception + { + cf.create() + .creatingParentsIfNeeded() + .forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task)); + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + try { + return cf.checkExists().forPath(joiner.join(tasksPath, task.getId())) == null; + } + catch (Exception e) { + return false; + } + } + } + ) + ); + + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + try { + return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null; + } + catch (Exception e) { + return false; + } + } + } + ) + ); + + TaskAnnouncement taskAnnouncement = jsonMapper.readValue( + cf.getData().forPath(joiner.join(statusPath, task.getId())), TaskAnnouncement.class + ); + + Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId()); + Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode()); + } +} \ No newline at end of file