rework tests in indexing service to be more unit testy

This commit is contained in:
fjy 2013-09-12 16:37:58 -07:00
parent a2dcc45a8e
commit f7c10e3594
12 changed files with 595 additions and 216 deletions

View File

@ -297,18 +297,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
if (runningTask != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) {
log.makeAlert("Told to run task that is in the running queue but no worker is actually running it?!")
.addData("taskId", task.getId())
.emit();
runningTasks.remove(task.getId());
log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
if (announcement.getTaskStatus().isComplete()) {
taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus());
}
return runningTask.getResult();
}
return runningTask.getResult();
}
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
@ -552,12 +550,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
timeoutStopwatch.start();
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task)) {
statusLock.wait(config.getTaskAssignmentTimeout().getMillis());
if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeout().getMillis()) {
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.error(
"Something went wrong! %s never ran task %s after %s!",
"Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!",
theWorker.getHost(),
task.getId(),
elapsed,
config.getTaskAssignmentTimeout()
);

View File

@ -183,7 +183,7 @@ public class WorkerCuratorCoordinator
}
}
public void announceTask(TaskAnnouncement announcement)
public void announceTastAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
@ -219,7 +219,7 @@ public class WorkerCuratorCoordinator
try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
announceTask(announcement);
announceTastAnnouncement(announcement);
return;
}
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);

View File

@ -42,7 +42,7 @@ import java.util.concurrent.ExecutorService;
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen.
* <p/>
* The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks.
*/
public class WorkerTaskMonitor
@ -122,7 +122,7 @@ public class WorkerTaskMonitor
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceTask(
workerCuratorCoordinator.announceTastAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())

View File

@ -0,0 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common;
/**
*/
public interface IndexingServiceCondition
{
public boolean isValid();
}

View File

@ -17,37 +17,59 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing;
package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import com.google.common.collect.Lists;
import io.druid.indexing.common.task.MergeTask;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
/**
*/
@JsonTypeName("test")
public class TestTask extends MergeTask
public class TestMergeTask extends MergeTask
{
private final TaskStatus status;
public static TestMergeTask createDummyTask(String taskId)
{
return new TestMergeTask(
taskId,
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList()
);
}
private final String id;
@JsonCreator
public TestTask(
public TestMergeTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("taskStatus") TaskStatus status
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
)
{
super(id, dataSource, segments, aggregators);
this.status = status;
this.id = id;
}
@Override
@ -57,15 +79,9 @@ public class TestTask extends MergeTask
return "test";
}
@JsonProperty
public TaskStatus getStatus()
{
return status;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return status;
return TaskStatus.running(id);
}
}

View File

@ -17,14 +17,12 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.coordinator;
package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.query.aggregation.AggregatorFactory;
@ -34,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec;
/**
*/
@JsonTypeName("test_realtime")
public class TestRealtimeTask extends RealtimeIndexTask
public class TestRealtimeTask extends RealtimeIndexTask implements TestTask
{
private final TaskStatus status;
@ -66,6 +64,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
return "test_realtime";
}
@Override
@JsonProperty
public TaskStatus getStatus()
{

View File

@ -0,0 +1,30 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
/**
*/
public interface TestTask extends Task
{
public TaskStatus getStatus();
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common;
import com.google.common.base.Stopwatch;
import com.metamx.common.ISE;
import java.util.concurrent.TimeUnit;
/**
*/
public class TestUtils
{
public static boolean conditionValid(IndexingServiceCondition condition)
{
try {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (!condition.isValid()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
}
catch (Exception e) {
return false;
}
return true;
}
}

View File

@ -20,62 +20,46 @@
package io.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.TestTask;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import io.druid.indexing.coordinator.setup.WorkerSetupData;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.indexing.worker.WorkerTaskMonitor;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
* class as well as integration tests in the very near future.
*/
public class RemoteTaskRunnerTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@ -88,10 +72,8 @@ public class RemoteTaskRunnerTest
private TestingCluster testingCluster;
private CuratorFramework cf;
private RemoteTaskRunner remoteTaskRunner;
private WorkerCuratorCoordinator workerCuratorCoordinator;
private WorkerTaskMonitor workerTaskMonitor;
private TestTask task;
private TestMergeTask task;
private Worker worker;
@ -108,26 +90,77 @@ public class RemoteTaskRunnerTest
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
cf.create().creatingParentsIfNeeded().forPath(tasksPath);
cf.create().creatingParentsIfNeeded().forPath(statusPath);
task = makeTask(TaskStatus.success("task"));
task = TestMergeTask.createDummyTask("task");
}
@After
public void tearDown() throws Exception
{
remoteTaskRunner.stop();
workerCuratorCoordinator.stop();
workerTaskMonitor.stop();
cf.close();
testingCluster.stop();
}
@Test
public void testRunNoExistingTask() throws Exception
public void testRun() throws Exception
{
doSetup();
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunExistingTaskThatHasntStartedRunning() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertFalse(result.isDone());
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunExistingTaskThatHasStartedRunning() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertFalse(result.isDone());
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
}
@Test
@ -139,7 +172,7 @@ public class RemoteTaskRunnerTest
doSetup();
remoteTaskRunner.run(makeTask(TaskStatus.success(new String(new char[5000]))));
remoteTaskRunner.run(TestMergeTask.createDummyTask(new String(new char[5000])));
EasyMock.verify(emitter);
}
@ -149,31 +182,43 @@ public class RemoteTaskRunnerTest
{
doSetup();
TestRealtimeTask theTask = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1")
);
remoteTaskRunner.run(theTask);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"))
);
remoteTaskRunner.run(
new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"))
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"));
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"));
remoteTaskRunner.run(task3);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (remoteTaskRunner.getRunningTasks().size() < 2) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
@ -182,53 +227,62 @@ public class RemoteTaskRunnerTest
{
doSetup();
TestRealtimeTask theTask = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1")
);
remoteTaskRunner.run(theTask);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"))
);
remoteTaskRunner.run(
new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"))
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"));
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"));
remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (remoteTaskRunner.getRunningTasks().size() < 2) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
@Test
public void testFailure() throws Exception
public void testStatusRemoved() throws Exception
{
doSetup();
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
final String taskStatus = joiner.join(statusPath, "task");
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(taskStatus) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task"));
cf.delete().forPath(taskStatus);
cf.delete().forPath(joiner.join(statusPath, task.getId()));
TaskStatus status = future.get();
@ -258,7 +312,7 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(existingTasks.contains("first"));
Assert.assertTrue(existingTasks.contains("second"));
remoteTaskRunner.bootstrap(Arrays.<Task>asList(makeTask(TaskStatus.running("second"))));
remoteTaskRunner.bootstrap(Arrays.<Task>asList(TestMergeTask.createDummyTask("second")));
Set<String> runningTasks = Sets.newHashSet(
Iterables.transform(
@ -303,18 +357,14 @@ public class RemoteTaskRunnerTest
{
doSetup();
remoteTaskRunner.bootstrap(Lists.<Task>newArrayList());
Future<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
Future<TaskStatus> future = remoteTaskRunner.run(task);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
workerCuratorCoordinator.stop();
Assert.assertTrue(workerRunningTask(task.getId()));
cf.delete().forPath(announcementsPath);
TaskStatus status = future.get();
@ -325,68 +375,6 @@ public class RemoteTaskRunnerTest
{
makeWorker();
makeRemoteTaskRunner();
makeTaskMonitor();
}
private TestTask makeTask(TaskStatus status)
{
return new TestTask(
status.getId(),
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList(),
status
);
}
private void makeTaskMonitor() throws Exception
{
workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return basePath;
}
},
new TestRemoteTaskRunnerConfig(),
cf,
worker
);
workerCuratorCoordinator.start();
final File tmp = Files.createTempDir();
// Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor(
jsonMapper,
cf,
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, jsonMapper
)
),
new WorkerConfig().setCapacity(1)
);
jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
}
private void makeRemoteTaskRunner() throws Exception
@ -426,30 +414,60 @@ public class RemoteTaskRunnerTest
);
}
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
private boolean taskAnnounced(final String taskId)
{
@Override
public boolean isCompressZnodes()
{
return false;
}
return pathExists(joiner.join(tasksPath, taskId));
}
@Override
public Period getTaskAssignmentTimeout()
{
return new Period(60000);
}
private boolean workerRunningTask(final String taskId)
{
return pathExists(joiner.join(statusPath, taskId));
}
@Override
public long getMaxZnodeBytes()
{
return 1000;
}
private boolean pathExists(final String path)
{
return TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(path) != null;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
@Override
public String getWorkerVersion()
{
return "";
}
private boolean workerCompletedTask(final ListenableFuture<TaskStatus> result)
{
return TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return result.isDone();
}
}
);
}
private void mockWorkerRunningTask(final Task task) throws Exception
{
cf.delete().forPath(joiner.join(tasksPath, task.getId()));
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()));
cf.create().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception
{
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()));
cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
}

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.coordinator;
import io.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import org.joda.time.Period;
/**
*/
public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
public boolean isCompressZnodes()
{
return false;
}
@Override
public Period getTaskAssignmentTimeout()
{
return new Period("PT1S");
}
@Override
public long getMaxZnodeBytes()
{
return 1000;
}
@Override
public String getWorkerVersion()
{
return "";
}
}

View File

@ -26,7 +26,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers;
import io.druid.indexing.TestTask;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
@ -69,7 +69,7 @@ public class SimpleResourceManagementStrategyTest
)
);
testTask = new TestTask(
testTask = new TestMergeTask(
"task1",
"dummyDs",
Lists.<DataSegment>newArrayList(
@ -85,8 +85,7 @@ public class SimpleResourceManagementStrategyTest
0
)
),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.success("task1")
Lists.<AggregatorFactory>newArrayList()
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
autoScalingStrategy,

View File

@ -0,0 +1,189 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
import com.google.common.io.Files;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.coordinator.TestRemoteTaskRunnerConfig;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
/**
*/
public class WorkerTaskMonitorTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final Joiner joiner = Joiner.on("/");
private static final String basePath = "/test/druid";
private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
private TestingCluster testingCluster;
private CuratorFramework cf;
private WorkerCuratorCoordinator workerCuratorCoordinator;
private WorkerTaskMonitor workerTaskMonitor;
private TestMergeTask task;
private Worker worker;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
//cf.create().creatingParentsIfNeeded().forPath(tasksPath);
//cf.create().creatingParentsIfNeeded().forPath(statusPath);
worker = new Worker(
"worker",
"localhost",
3,
"0"
);
workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return basePath;
}
},
new TestRemoteTaskRunnerConfig(),
cf,
worker
);
workerCuratorCoordinator.start();
final File tmp = Files.createTempDir();
// Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor(
jsonMapper,
cf,
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, jsonMapper
)
),
new WorkerConfig().setCapacity(1)
);
jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
task = TestMergeTask.createDummyTask("test");
}
@After
public void tearDown() throws Exception
{
workerTaskMonitor.stop();
cf.close();
testingCluster.stop();
}
@Test
public void testRunTask() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(tasksPath, task.getId())) == null;
}
catch (Exception e) {
return false;
}
}
}
)
);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
TaskAnnouncement taskAnnouncement = jsonMapper.readValue(
cf.getData().forPath(joiner.join(statusPath, task.getId())), TaskAnnouncement.class
);
Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode());
}
}