From 6227963af9d9a7065438ab06ac0f43418ff82925 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Dec 2013 00:35:33 -0800 Subject: [PATCH] TaskQueue: Copy task list before management loop. --- .../druid/indexing/common/task/NoopTask.java | 57 +++++++++++++++++-- .../io/druid/indexing/overlord/TaskQueue.java | 4 +- .../indexing/common/TestRealtimeTask.java | 9 +-- .../indexing/overlord/TaskLifecycleTest.java | 43 +++++++++----- 4 files changed, 86 insertions(+), 27 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 001e1520eaa..d45f66377b7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -20,7 +20,9 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; @@ -34,14 +36,34 @@ public class NoopTask extends AbstractTask { private static final Logger log = new Logger(NoopTask.class); private static int defaultRunTime = 2500; + private static int defaultIsReadyTime = 0; + private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES; - private final int runTime; + enum IsReadyResult + { + YES, + NO, + EXCEPTION + } + + @JsonIgnore + private final long runTime; + + @JsonIgnore + private final long isReadyTime; + + @JsonIgnore + private final IsReadyResult isReadyResult; + + @JsonIgnore private final FirehoseFactory firehoseFactory; @JsonCreator public NoopTask( @JsonProperty("id") String id, - @JsonProperty("runTime") int runTime, + @JsonProperty("runTime") long runTime, + @JsonProperty("isReadyTime") long isReadyTime, + @JsonProperty("isReadyResult") String isReadyResult, @JsonProperty("firehose") FirehoseFactory firehoseFactory ) { @@ -51,6 +73,10 @@ public class NoopTask extends AbstractTask ); this.runTime = (runTime == 0) ? defaultRunTime : runTime; + this.isReadyTime = (isReadyTime == 0) ? defaultIsReadyTime : isReadyTime; + this.isReadyResult = (isReadyResult == null) + ? defaultIsReadyResult + : IsReadyResult.valueOf(isReadyResult.toUpperCase()); this.firehoseFactory = firehoseFactory; } @@ -60,12 +86,24 @@ public class NoopTask extends AbstractTask return "noop"; } - @JsonProperty("runTime") - public int getRunTime() + @JsonProperty + public long getRunTime() { return runTime; } + @JsonProperty + public long getIsReadyTime() + { + return isReadyTime; + } + + @JsonProperty + public IsReadyResult getIsReadyResult() + { + return isReadyResult; + } + @JsonProperty("firehose") public FirehoseFactory getFirehoseFactory() { @@ -75,7 +113,16 @@ public class NoopTask extends AbstractTask @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return true; + switch (isReadyResult) { + case YES: + return true; + case NO: + return false; + case EXCEPTION: + throw new ISE("Not ready. Never will be ready. Go away!"); + default: + throw new AssertionError("#notreached"); + } } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 77e4b26e372..76ce14e8320 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -24,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -224,7 +225,8 @@ public class TaskQueue runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } // Attain futures for all active tasks (assuming they are ready to run). - for (final Task task : tasks) { + // Copy tasks list, as notifyStatus may modify it. + for (final Task task : ImmutableList.copyOf(tasks)) { if (!taskFutures.containsKey(task.getId())) { final ListenableFuture runnerTaskFuture; if (runnerTaskFutures.containsKey(task.getId())) { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index cc69067d23c..178cae10513 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -32,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec; /** */ @JsonTypeName("test_realtime") -public class TestRealtimeTask extends RealtimeIndexTask implements TestTask +public class TestRealtimeTask extends RealtimeIndexTask { private final TaskStatus status; @@ -64,13 +64,6 @@ public class TestRealtimeTask extends RealtimeIndexTask implements TestTask return "test_realtime"; } - @Override - @JsonProperty - public TaskStatus getStatus() - { - return status; - } - @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index ac90c1e12f0..910b0c04e36 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -115,7 +116,10 @@ public class TaskLifecycleTest tmp = Files.createTempDir(); - final TaskQueueConfig tqc = new DefaultObjectMapper().readValue("{\"startDelay\":\"PT0S\"}", TaskQueueConfig.class); + final TaskQueueConfig tqc = new DefaultObjectMapper().readValue( + "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}", + TaskQueueConfig.class + ); ts = new HeapMemoryTaskStorage(); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); @@ -400,28 +404,41 @@ public class TaskLifecycleTest Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } - private TaskStatus runTask(Task task) + private TaskStatus runTask(final Task task) throws Exception { + final Task dummyTask = new DefaultObjectMapper().readValue( + "{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"", + Task.class + ); final long startTime = System.currentTimeMillis(); + Preconditions.checkArgument(!task.getId().equals(dummyTask.getId())); + + tq.add(dummyTask); tq.add(task); - TaskStatus status; + TaskStatus retVal = null; - try { - while ((status = tsqa.getStatus(task.getId()).get()).isRunnable()) { - if (System.currentTimeMillis() > startTime + 10 * 1000) { - throw new ISE("Where did the task go?!: %s", task.getId()); + for (final String taskId : ImmutableList.of(dummyTask.getId(), task.getId())) { + try { + TaskStatus status; + while ((status = tsqa.getStatus(taskId).get()).isRunnable()) { + if (System.currentTimeMillis() > startTime + 10 * 1000) { + throw new ISE("Where did the task go?!: %s", task.getId()); + } + + Thread.sleep(100); } - - Thread.sleep(100); + if (taskId.equals(task.getId())) { + retVal = status; + } + } + catch (Exception e) { + throw Throwables.propagate(e); } } - catch (Exception e) { - throw Throwables.propagate(e); - } - return status; + return retVal; } private static class MockIndexerDBCoordinator extends IndexerDBCoordinator