Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner (#6258)

This commit is contained in:
Jonathan Wei 2018-08-28 16:12:02 -07:00 committed by GitHub
parent 80224df36a
commit c9a27e3e8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -317,6 +317,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
)
);
Throwable caughtExceptionOuter = null;
try (final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@ -412,6 +413,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
// Could eventually support leader/follower mode (for keeping replicas more in sync)
boolean stillReading = !assignment.isEmpty();
status = Status.READING;
Throwable caughtExceptionInner = null;
try {
while (stillReading) {
if (possiblyPause()) {
@ -616,12 +618,22 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
}
catch (Exception e) {
// (1) catch all exceptions while reading from kafka
caughtExceptionInner = e;
log.error(e, "Encountered exception in run() before persisting.");
throw e;
}
finally {
log.info("Persisting all pending data");
driver.persist(committerSupplier.get()); // persist pending data
try {
driver.persist(committerSupplier.get()); // persist pending data
}
catch (Exception e) {
if (caughtExceptionInner != null) {
caughtExceptionInner.addSuppressed(e);
} else {
throw e;
}
}
}
synchronized (statusLock) {
@ -687,9 +699,18 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
catch (InterruptedException | RejectedExecutionException e) {
// (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
// the final publishing.
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
appenderator.closeNow();
caughtExceptionOuter = e;
try {
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
if (appenderator != null) {
appenderator.closeNow();
}
}
catch (Exception e2) {
e.addSuppressed(e2);
}
// handle the InterruptedException that gets wrapped in a RejectedExecutionException
if (e instanceof RejectedExecutionException
&& (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
@ -706,21 +727,38 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
}
catch (Exception e) {
// (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
appenderator.closeNow();
caughtExceptionOuter = e;
try {
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
if (appenderator != null) {
appenderator.closeNow();
}
}
catch (Exception e2) {
e.addSuppressed(e2);
}
throw e;
}
finally {
if (driver != null) {
driver.close();
}
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(task.getId());
}
try {
if (driver != null) {
driver.close();
}
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(task.getId());
}
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();
}
catch (Exception e) {
if (caughtExceptionOuter != null) {
caughtExceptionOuter.addSuppressed(e);
} else {
throw e;
}
}
}
toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null));