diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 9a53a3a6392..cfec3e08322 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -117,7 +117,13 @@ public class RealtimeIndexTask extends AbstractTask private volatile Firehose firehose = null; @JsonIgnore - private volatile boolean stopped = false; + private volatile boolean gracefullyStopped = false; + + @JsonIgnore + private volatile boolean finishingJob = false; + + @JsonIgnore + private volatile Thread runThread = null; @JsonIgnore private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null; @@ -174,16 +180,12 @@ public class RealtimeIndexTask extends AbstractTask @Override public TaskStatus run(final TaskToolbox toolbox) throws Exception { + runThread = Thread.currentThread(); + if (this.plumber != null) { throw new IllegalStateException("WTF?!? run with non-null plumber??!"); } - // Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire - // them if we actually need them - for (final TaskLock taskLock : getTaskLocks(toolbox)) { - toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval())); - } - boolean normalExit = true; // It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for @@ -320,7 +322,7 @@ public class RealtimeIndexTask extends AbstractTask committerSupplier = Committers.supplierFromFirehose(firehose); // Time to read data! - while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) { + while ((!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) { final InputRow inputRow; try { @@ -357,39 +359,55 @@ public class RealtimeIndexTask extends AbstractTask finally { if (normalExit) { try { - if (!stopped) { - // Hand off all pending data - log.info("Persisting and handing off pending data."); - plumber.persist(committerSupplier.get()); - plumber.finishJob(); - } else { - log.info("Persisting pending data without handoff, in preparation for restart."); - final Committer committer = committerSupplier.get(); - final CountDownLatch persistLatch = new CountDownLatch(1); - plumber.persist( - new Committer() - { - @Override - public Object getMetadata() - { - return committer.getMetadata(); - } + // Always want to persist. + log.info("Persisting remaining data."); - @Override - public void run() - { - try { - committer.run(); - } - finally { - persistLatch.countDown(); - } + final Committer committer = committerSupplier.get(); + final CountDownLatch persistLatch = new CountDownLatch(1); + plumber.persist( + new Committer() + { + @Override + public Object getMetadata() + { + return committer.getMetadata(); + } + + @Override + public void run() + { + try { + committer.run(); + } + finally { + persistLatch.countDown(); } } - ); - persistLatch.await(); + } + ); + persistLatch.await(); + + if (gracefullyStopped) { + log.info("Gracefully stopping."); + } else { + log.info("Finishing the job."); + synchronized (this) { + if (gracefullyStopped) { + // Someone called stopGracefully after we checked the flag. That's okay, just stop now. + log.info("Gracefully stopping."); + } else { + finishingJob = true; + } + } + + if (finishingJob) { + plumber.finishJob(); + } } } + catch (InterruptedException e) { + log.debug(e, "Interrupted while finishing the job"); + } catch (Exception e) { log.makeAlert(e, "Failed to finish realtime task").emit(); throw e; @@ -417,13 +435,17 @@ public class RealtimeIndexTask extends AbstractTask { try { synchronized (this) { - if (!stopped) { - stopped = true; - log.info("Gracefully stopping."); - if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { + if (!gracefullyStopped) { + gracefullyStopped = true; + if (finishingJob) { + log.info("stopGracefully: Interrupting finishJob."); + runThread.interrupt(); + } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { + log.info("stopGracefully: Draining firehose."); firehose.close(); } else { - log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose); + log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread."); + runThread.interrupt(); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index dc8c559a236..64f133d90af 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -90,8 +90,10 @@ public class TaskLockbox try { // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. + final Set storedActiveTasks = Sets.newHashSet(); final List> storedLocks = Lists.newArrayList(); for (final Task task : taskStorage.getActiveTasks()) { + storedActiveTasks.add(task.getId()); for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { storedLocks.add(Pair.of(task, taskLock)); } @@ -111,6 +113,7 @@ public class TaskLockbox }; running.clear(); activeTasks.clear(); + activeTasks.addAll(storedActiveTasks); // Bookkeeping for a log message at the end int taskLockCount = 0; for (final Pair taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { @@ -121,7 +124,6 @@ public class TaskLockbox log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); continue; } - activeTasks.add(task.getId()); final Optional acquiredTaskLock = tryLock( task, savedTaskLock.getInterval(), diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index cc8addd1f5a..b41b753cf7c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -24,6 +24,8 @@ import com.google.api.client.util.Charsets; import com.google.api.client.util.Sets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; @@ -63,6 +65,7 @@ import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.indexing.test.TestDataSegmentPusher; import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import io.druid.jackson.DefaultObjectMapper; +import io.druid.metadata.EntryExistsException; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; import io.druid.query.IntervalChunkingQueryRunnerDecorator; @@ -96,6 +99,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.metrics.EventReceiverFirehoseRegister; +import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -175,6 +179,7 @@ public class RealtimeIndexTaskTest final RealtimeIndexTask task = makeRealtimeTask(null); final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); + final DataSegment publishedSegment; // Wait for firehose to show up, it starts off null. while (task.getFirehose() == null) { @@ -207,11 +212,22 @@ public class RealtimeIndexTaskTest Thread.sleep(50); } + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + // Do a query. Assert.assertEquals(2, countEvents(task)); // Simulate handoff. - for(Pair executorRunnablePair : handOffCallbacks.values()){ + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } handOffCallbacks.clear(); @@ -226,6 +242,7 @@ public class RealtimeIndexTaskTest { final File directory = tempFolder.newFolder(); final RealtimeIndexTask task1 = makeRealtimeTask(null); + final DataSegment publishedSegment; // First run: { @@ -298,11 +315,123 @@ public class RealtimeIndexTaskTest Thread.sleep(50); } + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + // Do a query. Assert.assertEquals(2, countEvents(task2)); // Simulate handoff. - for(Pair executorRunnablePair : handOffCallbacks.values()){ + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + } + } + + @Test(timeout = 10000L) + public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception + { + final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final File directory = tempFolder.newFolder(); + final RealtimeIndexTask task1 = makeRealtimeTask(null); + final DataSegment publishedSegment; + + // First run: + { + final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, mdc, directory); + final ListenableFuture statusFuture = runTask(task1, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task1.getFirehose() == null) { + Thread.sleep(50); + } + + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo") + ) + ) + ); + + // Stop the firehose, this will trigger a finishJob. + firehose.close(); + + // Wait for publish. + while (mdc.getPublished().isEmpty()) { + Thread.sleep(50); + } + + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + + // Do a query. + Assert.assertEquals(1, countEvents(task1)); + + // Trigger graceful shutdown. + task1.stopGracefully(); + + // Wait for the task to finish. The status doesn't really matter. + while (!statusFuture.isDone()) { + Thread.sleep(50); + } + } + + // Second run: + { + final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); + final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, mdc, directory); + final ListenableFuture statusFuture = runTask(task2, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task2.getFirehose() == null) { + Thread.sleep(50); + } + + // Stop the firehose again, this will start another handoff. + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose(); + + // Stop the firehose, this will trigger a finishJob. + firehose.close(); + + // publishedSegment is still published. No reason it shouldn't be. + Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished()); + + // Wait for a handoffCallback to show up. + while (handOffCallbacks.isEmpty()) { + Thread.sleep(50); + } + + // Simulate handoff. + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } handOffCallbacks.clear(); @@ -452,11 +581,36 @@ public class RealtimeIndexTaskTest ); } - private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory) + private TaskToolbox makeToolbox( + final Task task, + final IndexerMetadataStorageCoordinator mdc, + final File directory + ) + { + return makeToolbox( + task, + new HeapMemoryTaskStorage(new TaskStorageConfig(null)), + mdc, + directory + ); + } + + private TaskToolbox makeToolbox( + final Task task, + final TaskStorage taskStorage, + final IndexerMetadataStorageCoordinator mdc, + final File directory + ) { - final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); + try { + taskStorage.insert(task, TaskStatus.running(task.getId())); + } + catch (EntryExistsException e) { + // suppress + } + taskLockbox.syncFromStorage(); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, mdc, @@ -564,7 +718,6 @@ public class RealtimeIndexTaskTest new CacheConfig() ); - taskLockbox.add(task); return toolboxFactory.build(task); }