Merge pull request #2141 from gianm/fix-restoring-realtime

Fix some problems with restoring
This commit is contained in:
Nishant 2015-12-30 10:44:45 +05:30
commit df893dbaf8
3 changed files with 224 additions and 47 deletions

View File

@ -117,7 +117,13 @@ public class RealtimeIndexTask extends AbstractTask
private volatile Firehose firehose = null;
@JsonIgnore
private volatile boolean stopped = false;
private volatile boolean gracefullyStopped = false;
@JsonIgnore
private volatile boolean finishingJob = false;
@JsonIgnore
private volatile Thread runThread = null;
@JsonIgnore
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
@ -174,16 +180,12 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
runThread = Thread.currentThread();
if (this.plumber != null) {
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
}
// Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire
// them if we actually need them
for (final TaskLock taskLock : getTaskLocks(toolbox)) {
toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval()));
}
boolean normalExit = true;
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
@ -320,7 +322,7 @@ public class RealtimeIndexTask extends AbstractTask
committerSupplier = Committers.supplierFromFirehose(firehose);
// Time to read data!
while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) {
while ((!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;
try {
@ -357,39 +359,55 @@ public class RealtimeIndexTask extends AbstractTask
finally {
if (normalExit) {
try {
if (!stopped) {
// Hand off all pending data
log.info("Persisting and handing off pending data.");
plumber.persist(committerSupplier.get());
plumber.finishJob();
} else {
log.info("Persisting pending data without handoff, in preparation for restart.");
final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
return committer.getMetadata();
}
// Always want to persist.
log.info("Persisting remaining data.");
@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
return committer.getMetadata();
}
@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
}
);
persistLatch.await();
}
);
persistLatch.await();
if (gracefullyStopped) {
log.info("Gracefully stopping.");
} else {
log.info("Finishing the job.");
synchronized (this) {
if (gracefullyStopped) {
// Someone called stopGracefully after we checked the flag. That's okay, just stop now.
log.info("Gracefully stopping.");
} else {
finishingJob = true;
}
}
if (finishingJob) {
plumber.finishJob();
}
}
}
catch (InterruptedException e) {
log.debug(e, "Interrupted while finishing the job");
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
throw e;
@ -417,13 +435,17 @@ public class RealtimeIndexTask extends AbstractTask
{
try {
synchronized (this) {
if (!stopped) {
stopped = true;
log.info("Gracefully stopping.");
if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
if (!gracefullyStopped) {
gracefullyStopped = true;
if (finishingJob) {
log.info("stopGracefully: Interrupting finishJob.");
runThread.interrupt();
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
log.info("stopGracefully: Draining firehose.");
firehose.close();
} else {
log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose);
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
runThread.interrupt();
}
}
}

View File

@ -90,8 +90,10 @@ public class TaskLockbox
try {
// Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
final Set<String> storedActiveTasks = Sets.newHashSet();
final List<Pair<Task, TaskLock>> storedLocks = Lists.newArrayList();
for (final Task task : taskStorage.getActiveTasks()) {
storedActiveTasks.add(task.getId());
for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
storedLocks.add(Pair.of(task, taskLock));
}
@ -111,6 +113,7 @@ public class TaskLockbox
};
running.clear();
activeTasks.clear();
activeTasks.addAll(storedActiveTasks);
// Bookkeeping for a log message at the end
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
@ -121,7 +124,6 @@ public class TaskLockbox
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
activeTasks.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
savedTaskLock.getInterval(),

View File

@ -24,6 +24,8 @@ import com.google.api.client.util.Charsets;
import com.google.api.client.util.Sets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
@ -63,6 +65,7 @@ import io.druid.indexing.test.TestDataSegmentKiller;
import io.druid.indexing.test.TestDataSegmentPusher;
import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.EntryExistsException;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Druids;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
@ -96,6 +99,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.metrics.EventReceiverFirehoseRegister;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
@ -175,6 +179,7 @@ public class RealtimeIndexTaskTest
final RealtimeIndexTask task = makeRealtimeTask(null);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
final DataSegment publishedSegment;
// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
@ -207,11 +212,22 @@ public class RealtimeIndexTaskTest
Thread.sleep(50);
}
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(2, countEvents(task));
// Simulate handoff.
for(Pair<Executor, Runnable> executorRunnablePair : handOffCallbacks.values()){
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
Assert.assertEquals(
new SegmentDescriptor(
publishedSegment.getInterval(),
publishedSegment.getVersion(),
publishedSegment.getShardSpec().getPartitionNum()
),
entry.getKey()
);
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
@ -226,6 +242,7 @@ public class RealtimeIndexTaskTest
{
final File directory = tempFolder.newFolder();
final RealtimeIndexTask task1 = makeRealtimeTask(null);
final DataSegment publishedSegment;
// First run:
{
@ -298,11 +315,123 @@ public class RealtimeIndexTaskTest
Thread.sleep(50);
}
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(2, countEvents(task2));
// Simulate handoff.
for(Pair<Executor, Runnable> executorRunnablePair : handOffCallbacks.values()){
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
Assert.assertEquals(
new SegmentDescriptor(
publishedSegment.getInterval(),
publishedSegment.getVersion(),
publishedSegment.getShardSpec().getPartitionNum()
),
entry.getKey()
);
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
}
}
@Test(timeout = 10000L)
public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception
{
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final File directory = tempFolder.newFolder();
final RealtimeIndexTask task1 = makeRealtimeTask(null);
final DataSegment publishedSegment;
// First run:
{
final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, mdc, directory);
final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task1.getFirehose() == null) {
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
)
)
);
// Stop the firehose, this will trigger a finishJob.
firehose.close();
// Wait for publish.
while (mdc.getPublished().isEmpty()) {
Thread.sleep(50);
}
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(1, countEvents(task1));
// Trigger graceful shutdown.
task1.stopGracefully();
// Wait for the task to finish. The status doesn't really matter.
while (!statusFuture.isDone()) {
Thread.sleep(50);
}
}
// Second run:
{
final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId());
final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, mdc, directory);
final ListenableFuture<TaskStatus> statusFuture = runTask(task2, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task2.getFirehose() == null) {
Thread.sleep(50);
}
// Stop the firehose again, this will start another handoff.
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose();
// Stop the firehose, this will trigger a finishJob.
firehose.close();
// publishedSegment is still published. No reason it shouldn't be.
Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished());
// Wait for a handoffCallback to show up.
while (handOffCallbacks.isEmpty()) {
Thread.sleep(50);
}
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
Assert.assertEquals(
new SegmentDescriptor(
publishedSegment.getInterval(),
publishedSegment.getVersion(),
publishedSegment.getShardSpec().getPartitionNum()
),
entry.getKey()
);
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
@ -452,11 +581,36 @@ public class RealtimeIndexTaskTest
);
}
private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory)
private TaskToolbox makeToolbox(
final Task task,
final IndexerMetadataStorageCoordinator mdc,
final File directory
)
{
return makeToolbox(
task,
new HeapMemoryTaskStorage(new TaskStorageConfig(null)),
mdc,
directory
);
}
private TaskToolbox makeToolbox(
final Task task,
final TaskStorage taskStorage,
final IndexerMetadataStorageCoordinator mdc,
final File directory
)
{
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
}
catch (EntryExistsException e) {
// suppress
}
taskLockbox.syncFromStorage();
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
mdc,
@ -564,7 +718,6 @@ public class RealtimeIndexTaskTest
new CacheConfig()
);
taskLockbox.add(task);
return toolboxFactory.build(task);
}