Merge pull request #2357 from gianm/restorestuff

Fix NPE caused by stopping a RealtimeIndexTask before firehose connects
This commit is contained in:
Charles Allen 2016-01-29 12:34:21 -08:00
commit 6c8a024b84
3 changed files with 56 additions and 29 deletions

View File

@ -325,11 +325,17 @@ public class RealtimeIndexTask extends AbstractTask
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();
final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
// Skip connecting firehose if we've been stopped before we got started.
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser());
committerSupplier = Committers.supplierFromFirehose(firehose);
}
}
// Time to read data!
while ((!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;
try {
@ -366,7 +372,8 @@ public class RealtimeIndexTask extends AbstractTask
finally {
if (normalExit) {
try {
// Always want to persist.
// Persist if we had actually started.
if (firehose != null) {
log.info("Persisting remaining data.");
final Committer committer = committerSupplier.get();
@ -393,6 +400,7 @@ public class RealtimeIndexTask extends AbstractTask
}
);
persistLatch.await();
}
if (gracefullyStopped) {
log.info("Gracefully stopping.");
@ -420,8 +428,9 @@ public class RealtimeIndexTask extends AbstractTask
throw e;
}
finally {
// firehose will be non-null since normalExit is true
if (firehose != null) {
CloseQuietly.close(firehose);
}
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
}
@ -444,7 +453,9 @@ public class RealtimeIndexTask extends AbstractTask
synchronized (this) {
if (!gracefullyStopped) {
gracefullyStopped = true;
if (finishingJob) {
if (firehose == null) {
log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
} else if (finishingJob) {
log.info("stopGracefully: Interrupting finishJob.");
runThread.interrupt();
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {

View File

@ -144,7 +144,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
.addData("taskId", task.getId())
.addData("dataSource", task.getDataSource())
.emit();
log.warn(e, "Graceful shutdown of task[%s] aborted with exception.");
log.warn(e, "Graceful shutdown of task[%s] aborted with exception.", task.getId());
error = true;
}
} else {

View File

@ -536,6 +536,22 @@ public class RealtimeIndexTaskTest
}
}
@Test(timeout = 10000L)
public void testStopBeforeStarting() throws Exception
{
final File directory = tempFolder.newFolder();
final RealtimeIndexTask task1 = makeRealtimeTask(null);
task1.stopGracefully();
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory);
final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox);
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
}
private ListenableFuture<TaskStatus> runTask(final Task task, final TaskToolbox toolbox)
{
return taskExec.submit(