Merge pull request #237 from metamx/is-ut

Rework tests in indexing service to be more unit testy
This commit is contained in:
cheddar 2013-09-13 13:21:03 -07:00
commit ef1ac46a7e
12 changed files with 595 additions and 216 deletions

View File

@ -297,18 +297,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
if (runningTask != null) { if (runningTask != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId()); ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) { if (zkWorker == null) {
log.makeAlert("Told to run task that is in the running queue but no worker is actually running it?!") log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
.addData("taskId", task.getId())
.emit();
runningTasks.remove(task.getId());
} else { } else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost()); log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId()); TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
if (announcement.getTaskStatus().isComplete()) { if (announcement.getTaskStatus().isComplete()) {
taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus()); taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus());
} }
return runningTask.getResult();
} }
return runningTask.getResult();
} }
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId()); RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
@ -552,12 +550,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
timeoutStopwatch.start(); timeoutStopwatch.start();
synchronized (statusLock) { synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task)) { while (!isWorkerRunningTask(theWorker, task)) {
statusLock.wait(config.getTaskAssignmentTimeout().getMillis()); final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeout().getMillis()) { statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.error( log.error(
"Something went wrong! %s never ran task %s after %s!", "Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!",
theWorker.getHost(), theWorker.getHost(),
task.getId(), task.getId(),
elapsed,
config.getTaskAssignmentTimeout() config.getTaskAssignmentTimeout()
); );

View File

@ -183,7 +183,7 @@ public class WorkerCuratorCoordinator
} }
} }
public void announceTask(TaskAnnouncement announcement) public void announceTastAnnouncement(TaskAnnouncement announcement)
{ {
synchronized (lock) { synchronized (lock) {
if (!started) { if (!started) {
@ -219,7 +219,7 @@ public class WorkerCuratorCoordinator
try { try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) { if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
announceTask(announcement); announceTastAnnouncement(announcement);
return; return;
} }
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);

View File

@ -42,7 +42,7 @@ import java.util.concurrent.ExecutorService;
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be * The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen. * created that waits for new tasks. Tasks are executed as soon as they are seen.
* <p/> * <p/>
* The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for * The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks. * realtime index tasks.
*/ */
public class WorkerTaskMonitor public class WorkerTaskMonitor
@ -122,7 +122,7 @@ public class WorkerTaskMonitor
TaskStatus taskStatus; TaskStatus taskStatus;
try { try {
workerCuratorCoordinator.unannounceTask(task.getId()); workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceTask( workerCuratorCoordinator.announceTastAnnouncement(
TaskAnnouncement.create( TaskAnnouncement.create(
task, task,
TaskStatus.running(task.getId()) TaskStatus.running(task.getId())

View File

@ -0,0 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common;
/**
*/
public interface IndexingServiceCondition
{
public boolean isValid();
}

View File

@ -17,37 +17,59 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing; package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexing.common.TaskStatus; import com.google.common.collect.Lists;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.task.MergeTask; import io.druid.indexing.common.task.MergeTask;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List; import java.util.List;
/** /**
*/ */
@JsonTypeName("test") @JsonTypeName("test")
public class TestTask extends MergeTask public class TestMergeTask extends MergeTask
{ {
private final TaskStatus status; public static TestMergeTask createDummyTask(String taskId)
{
return new TestMergeTask(
taskId,
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList()
);
}
private final String id;
@JsonCreator @JsonCreator
public TestTask( public TestMergeTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments, @JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators, @JsonProperty("aggregations") List<AggregatorFactory> aggregators
@JsonProperty("taskStatus") TaskStatus status
) )
{ {
super(id, dataSource, segments, aggregators); super(id, dataSource, segments, aggregators);
this.status = status; this.id = id;
} }
@Override @Override
@ -57,15 +79,9 @@ public class TestTask extends MergeTask
return "test"; return "test";
} }
@JsonProperty
public TaskStatus getStatus()
{
return status;
}
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
return status; return TaskStatus.running(id);
} }
} }

View File

@ -17,14 +17,12 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.coordinator; package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.common.task.TaskResource;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
@ -34,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec;
/** /**
*/ */
@JsonTypeName("test_realtime") @JsonTypeName("test_realtime")
public class TestRealtimeTask extends RealtimeIndexTask public class TestRealtimeTask extends RealtimeIndexTask implements TestTask
{ {
private final TaskStatus status; private final TaskStatus status;
@ -66,6 +64,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
return "test_realtime"; return "test_realtime";
} }
@Override
@JsonProperty @JsonProperty
public TaskStatus getStatus() public TaskStatus getStatus()
{ {

View File

@ -0,0 +1,30 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
/**
*/
public interface TestTask extends Task
{
public TaskStatus getStatus();
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common;
import com.google.common.base.Stopwatch;
import com.metamx.common.ISE;
import java.util.concurrent.TimeUnit;
/**
*/
public class TestUtils
{
public static boolean conditionValid(IndexingServiceCondition condition)
{
try {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (!condition.isValid()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
}
catch (Exception e) {
return false;
}
return true;
}
}

View File

@ -20,62 +20,46 @@
package io.druid.indexing.coordinator; package io.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers; import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.TestTask; import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import io.druid.indexing.coordinator.setup.WorkerSetupData; import io.druid.indexing.coordinator.setup.WorkerSetupData;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.indexing.worker.WorkerTaskMonitor;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/**
* Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
* class as well as integration tests in the very near future.
*/
public class RemoteTaskRunnerTest public class RemoteTaskRunnerTest
{ {
private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@ -88,10 +72,8 @@ public class RemoteTaskRunnerTest
private TestingCluster testingCluster; private TestingCluster testingCluster;
private CuratorFramework cf; private CuratorFramework cf;
private RemoteTaskRunner remoteTaskRunner; private RemoteTaskRunner remoteTaskRunner;
private WorkerCuratorCoordinator workerCuratorCoordinator;
private WorkerTaskMonitor workerTaskMonitor;
private TestTask task; private TestMergeTask task;
private Worker worker; private Worker worker;
@ -108,26 +90,77 @@ public class RemoteTaskRunnerTest
.build(); .build();
cf.start(); cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath); cf.create().creatingParentsIfNeeded().forPath(basePath);
cf.create().creatingParentsIfNeeded().forPath(tasksPath);
cf.create().creatingParentsIfNeeded().forPath(statusPath);
task = makeTask(TaskStatus.success("task"));
task = TestMergeTask.createDummyTask("task");
} }
@After @After
public void tearDown() throws Exception public void tearDown() throws Exception
{ {
remoteTaskRunner.stop(); remoteTaskRunner.stop();
workerCuratorCoordinator.stop();
workerTaskMonitor.stop();
cf.close(); cf.close();
testingCluster.stop(); testingCluster.stop();
} }
@Test @Test
public void testRunNoExistingTask() throws Exception public void testRun() throws Exception
{
doSetup();
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunExistingTaskThatHasntStartedRunning() throws Exception
{ {
doSetup(); doSetup();
remoteTaskRunner.run(task); remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertFalse(result.isDone());
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunExistingTaskThatHasStartedRunning() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertFalse(result.isDone());
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
} }
@Test @Test
@ -139,7 +172,7 @@ public class RemoteTaskRunnerTest
doSetup(); doSetup();
remoteTaskRunner.run(makeTask(TaskStatus.success(new String(new char[5000])))); remoteTaskRunner.run(TestMergeTask.createDummyTask(new String(new char[5000])));
EasyMock.verify(emitter); EasyMock.verify(emitter);
} }
@ -149,31 +182,43 @@ public class RemoteTaskRunnerTest
{ {
doSetup(); doSetup();
TestRealtimeTask theTask = new TestRealtimeTask( TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
"rt1", remoteTaskRunner.run(task1);
new TaskResource("rt1", 1), Assert.assertTrue(taskAnnounced(task1.getId()));
"foo", mockWorkerRunningTask(task1);
TaskStatus.running("rt1")
); TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"));
remoteTaskRunner.run(theTask); remoteTaskRunner.run(task2);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2")) TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"));
); remoteTaskRunner.run(task3);
remoteTaskRunner.run(
new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3")) Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
); );
Stopwatch stopwatch = new Stopwatch(); Assert.assertTrue(
stopwatch.start(); TestUtils.conditionValid(
while (remoteTaskRunner.getRunningTasks().size() < 2) { new IndexingServiceCondition()
Thread.sleep(100); {
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { @Override
throw new ISE("Cannot find running task"); public boolean isValid()
} {
} return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
} }
@ -182,53 +227,62 @@ public class RemoteTaskRunnerTest
{ {
doSetup(); doSetup();
TestRealtimeTask theTask = new TestRealtimeTask( TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
"rt1", remoteTaskRunner.run(task1);
new TaskResource("rt1", 1), Assert.assertTrue(taskAnnounced(task1.getId()));
"foo", mockWorkerRunningTask(task1);
TaskStatus.running("rt1")
); TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"));
remoteTaskRunner.run(theTask); remoteTaskRunner.run(task2);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2")) TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"));
); remoteTaskRunner.run(task3);
remoteTaskRunner.run( Assert.assertTrue(taskAnnounced(task3.getId()));
new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3")) mockWorkerRunningTask(task3);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
); );
Stopwatch stopwatch = new Stopwatch(); Assert.assertTrue(
stopwatch.start(); TestUtils.conditionValid(
while (remoteTaskRunner.getRunningTasks().size() < 2) { new IndexingServiceCondition()
Thread.sleep(100); {
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { @Override
throw new ISE("Cannot find running task"); public boolean isValid()
} {
} return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
} }
@Test @Test
public void testFailure() throws Exception public void testStatusRemoved() throws Exception
{ {
doSetup(); doSetup();
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);
final String taskStatus = joiner.join(statusPath, "task"); Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(taskStatus) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task")); Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task"));
cf.delete().forPath(taskStatus); cf.delete().forPath(joiner.join(statusPath, task.getId()));
TaskStatus status = future.get(); TaskStatus status = future.get();
@ -258,7 +312,7 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(existingTasks.contains("first")); Assert.assertTrue(existingTasks.contains("first"));
Assert.assertTrue(existingTasks.contains("second")); Assert.assertTrue(existingTasks.contains("second"));
remoteTaskRunner.bootstrap(Arrays.<Task>asList(makeTask(TaskStatus.running("second")))); remoteTaskRunner.bootstrap(Arrays.<Task>asList(TestMergeTask.createDummyTask("second")));
Set<String> runningTasks = Sets.newHashSet( Set<String> runningTasks = Sets.newHashSet(
Iterables.transform( Iterables.transform(
@ -303,18 +357,14 @@ public class RemoteTaskRunnerTest
{ {
doSetup(); doSetup();
remoteTaskRunner.bootstrap(Lists.<Task>newArrayList()); remoteTaskRunner.bootstrap(Lists.<Task>newArrayList());
Future<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); Future<TaskStatus> future = remoteTaskRunner.run(task);
Stopwatch stopwatch = new Stopwatch(); Assert.assertTrue(taskAnnounced(task.getId()));
stopwatch.start(); mockWorkerRunningTask(task);
while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
workerCuratorCoordinator.stop(); Assert.assertTrue(workerRunningTask(task.getId()));
cf.delete().forPath(announcementsPath);
TaskStatus status = future.get(); TaskStatus status = future.get();
@ -325,68 +375,6 @@ public class RemoteTaskRunnerTest
{ {
makeWorker(); makeWorker();
makeRemoteTaskRunner(); makeRemoteTaskRunner();
makeTaskMonitor();
}
private TestTask makeTask(TaskStatus status)
{
return new TestTask(
status.getId(),
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList(),
status
);
}
private void makeTaskMonitor() throws Exception
{
workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return basePath;
}
},
new TestRemoteTaskRunnerConfig(),
cf,
worker
);
workerCuratorCoordinator.start();
final File tmp = Files.createTempDir();
// Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor(
jsonMapper,
cf,
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, jsonMapper
)
),
new WorkerConfig().setCapacity(1)
);
jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
} }
private void makeRemoteTaskRunner() throws Exception private void makeRemoteTaskRunner() throws Exception
@ -426,30 +414,60 @@ public class RemoteTaskRunnerTest
); );
} }
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig private boolean taskAnnounced(final String taskId)
{ {
@Override return pathExists(joiner.join(tasksPath, taskId));
public boolean isCompressZnodes() }
{
return false;
}
@Override private boolean workerRunningTask(final String taskId)
public Period getTaskAssignmentTimeout() {
{ return pathExists(joiner.join(statusPath, taskId));
return new Period(60000); }
}
@Override private boolean pathExists(final String path)
public long getMaxZnodeBytes() {
{ return TestUtils.conditionValid(
return 1000; new IndexingServiceCondition()
} {
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(path) != null;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
@Override private boolean workerCompletedTask(final ListenableFuture<TaskStatus> result)
public String getWorkerVersion() {
{ return TestUtils.conditionValid(
return ""; new IndexingServiceCondition()
} {
@Override
public boolean isValid()
{
return result.isDone();
}
}
);
}
private void mockWorkerRunningTask(final Task task) throws Exception
{
cf.delete().forPath(joiner.join(tasksPath, task.getId()));
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()));
cf.create().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception
{
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()));
cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
} }
} }

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.coordinator;
import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import org.joda.time.Period;
/**
*/
public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
public boolean isCompressZnodes()
{
return false;
}
@Override
public Period getTaskAssignmentTimeout()
{
return new Period("PT1S");
}
@Override
public long getMaxZnodeBytes()
{
return 1000;
}
@Override
public String getWorkerVersion()
{
return "";
}
}

View File

@ -26,7 +26,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers; import io.druid.common.guava.DSuppliers;
import io.druid.indexing.TestTask; import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.coordinator.RemoteTaskRunnerWorkItem; import io.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
@ -69,7 +69,7 @@ public class SimpleResourceManagementStrategyTest
) )
); );
testTask = new TestTask( testTask = new TestMergeTask(
"task1", "task1",
"dummyDs", "dummyDs",
Lists.<DataSegment>newArrayList( Lists.<DataSegment>newArrayList(
@ -85,8 +85,7 @@ public class SimpleResourceManagementStrategyTest
0 0
) )
), ),
Lists.<AggregatorFactory>newArrayList(), Lists.<AggregatorFactory>newArrayList()
TaskStatus.success("task1")
); );
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
autoScalingStrategy, autoScalingStrategy,

View File

@ -0,0 +1,189 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
import com.google.common.io.Files;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.coordinator.TestRemoteTaskRunnerConfig;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
/**
*/
public class WorkerTaskMonitorTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final Joiner joiner = Joiner.on("/");
private static final String basePath = "/test/druid";
private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
private TestingCluster testingCluster;
private CuratorFramework cf;
private WorkerCuratorCoordinator workerCuratorCoordinator;
private WorkerTaskMonitor workerTaskMonitor;
private TestMergeTask task;
private Worker worker;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
//cf.create().creatingParentsIfNeeded().forPath(tasksPath);
//cf.create().creatingParentsIfNeeded().forPath(statusPath);
worker = new Worker(
"worker",
"localhost",
3,
"0"
);
workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return basePath;
}
},
new TestRemoteTaskRunnerConfig(),
cf,
worker
);
workerCuratorCoordinator.start();
final File tmp = Files.createTempDir();
// Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor(
jsonMapper,
cf,
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, jsonMapper
)
),
new WorkerConfig().setCapacity(1)
);
jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
task = TestMergeTask.createDummyTask("test");
}
@After
public void tearDown() throws Exception
{
workerTaskMonitor.stop();
cf.close();
testingCluster.stop();
}
@Test
public void testRunTask() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(tasksPath, task.getId())) == null;
}
catch (Exception e) {
return false;
}
}
}
)
);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
TaskAnnouncement taskAnnouncement = jsonMapper.readValue(
cf.getData().forPath(joiner.join(statusPath, task.getId())), TaskAnnouncement.class
);
Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode());
}
}