TaskQueue: Copy task list before management loop.

This commit is contained in:
Gian Merlino 2013-12-13 00:35:33 -08:00
parent aef7726c33
commit 6227963af9
4 changed files with 86 additions and 27 deletions

View File

@ -20,7 +20,9 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus;
@ -34,14 +36,34 @@ public class NoopTask extends AbstractTask
{
private static final Logger log = new Logger(NoopTask.class);
private static int defaultRunTime = 2500;
private static int defaultIsReadyTime = 0;
private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
private final int runTime;
enum IsReadyResult
{
YES,
NO,
EXCEPTION
}
@JsonIgnore
private final long runTime;
@JsonIgnore
private final long isReadyTime;
@JsonIgnore
private final IsReadyResult isReadyResult;
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@JsonCreator
public NoopTask(
@JsonProperty("id") String id,
@JsonProperty("runTime") int runTime,
@JsonProperty("runTime") long runTime,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
)
{
@ -51,6 +73,10 @@ public class NoopTask extends AbstractTask
);
this.runTime = (runTime == 0) ? defaultRunTime : runTime;
this.isReadyTime = (isReadyTime == 0) ? defaultIsReadyTime : isReadyTime;
this.isReadyResult = (isReadyResult == null)
? defaultIsReadyResult
: IsReadyResult.valueOf(isReadyResult.toUpperCase());
this.firehoseFactory = firehoseFactory;
}
@ -60,12 +86,24 @@ public class NoopTask extends AbstractTask
return "noop";
}
@JsonProperty("runTime")
public int getRunTime()
@JsonProperty
public long getRunTime()
{
return runTime;
}
@JsonProperty
public long getIsReadyTime()
{
return isReadyTime;
}
@JsonProperty
public IsReadyResult getIsReadyResult()
{
return isReadyResult;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
@ -75,7 +113,16 @@ public class NoopTask extends AbstractTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
switch (isReadyResult) {
case YES:
return true;
case NO:
return false;
case EXCEPTION:
throw new ISE("Not ready. Never will be ready. Go away!");
default:
throw new AssertionError("#notreached");
}
}
@Override

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -224,7 +225,8 @@ public class TaskQueue
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
}
// Attain futures for all active tasks (assuming they are ready to run).
for (final Task task : tasks) {
// Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) {
final ListenableFuture<TaskStatus> runnerTaskFuture;
if (runnerTaskFutures.containsKey(task.getId())) {

View File

@ -32,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec;
/**
*/
@JsonTypeName("test_realtime")
public class TestRealtimeTask extends RealtimeIndexTask implements TestTask
public class TestRealtimeTask extends RealtimeIndexTask
{
private final TaskStatus status;
@ -64,13 +64,6 @@ public class TestRealtimeTask extends RealtimeIndexTask implements TestTask
return "test_realtime";
}
@Override
@JsonProperty
public TaskStatus getStatus()
{
return status;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -115,7 +116,10 @@ public class TaskLifecycleTest
tmp = Files.createTempDir();
final TaskQueueConfig tqc = new DefaultObjectMapper().readValue("{\"startDelay\":\"PT0S\"}", TaskQueueConfig.class);
final TaskQueueConfig tqc = new DefaultObjectMapper().readValue(
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}",
TaskQueueConfig.class
);
ts = new HeapMemoryTaskStorage();
tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts);
@ -400,28 +404,41 @@ public class TaskLifecycleTest
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
private TaskStatus runTask(Task task)
private TaskStatus runTask(final Task task) throws Exception
{
final Task dummyTask = new DefaultObjectMapper().readValue(
"{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"",
Task.class
);
final long startTime = System.currentTimeMillis();
Preconditions.checkArgument(!task.getId().equals(dummyTask.getId()));
tq.add(dummyTask);
tq.add(task);
TaskStatus status;
TaskStatus retVal = null;
try {
while ((status = tsqa.getStatus(task.getId()).get()).isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) {
throw new ISE("Where did the task go?!: %s", task.getId());
for (final String taskId : ImmutableList.of(dummyTask.getId(), task.getId())) {
try {
TaskStatus status;
while ((status = tsqa.getStatus(taskId).get()).isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) {
throw new ISE("Where did the task go?!: %s", task.getId());
}
Thread.sleep(100);
}
Thread.sleep(100);
if (taskId.equals(task.getId())) {
retVal = status;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return status;
return retVal;
}
private static class MockIndexerDBCoordinator extends IndexerDBCoordinator