mirror of https://github.com/apache/druid.git
RealtimeManager: Handle FormattedExceptions at any stage of processing other than hasMore
This commit is contained in:
parent
d4fc069e01
commit
1d1cb045f0
|
@ -162,14 +162,10 @@ public class RealtimeManager implements QuerySegmentWalker
|
|||
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||
while (firehose.hasMore()) {
|
||||
final InputRow inputRow;
|
||||
try {
|
||||
try {
|
||||
inputRow = firehose.nextRow();
|
||||
}
|
||||
catch (FormattedException e) {
|
||||
log.info(e, "unparseable line: %s", e.getDetails());
|
||||
metrics.incrementUnparseable();
|
||||
continue;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.info(e, "thrown away line due to exception");
|
||||
metrics.incrementThrownAway();
|
||||
|
@ -196,6 +192,12 @@ public class RealtimeManager implements QuerySegmentWalker
|
|||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||
}
|
||||
}
|
||||
catch (FormattedException e) {
|
||||
log.info(e, "unparseable line: %s", e.getDetails());
|
||||
metrics.incrementUnparseable();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
|
||||
.emit();
|
||||
|
|
Loading…
Reference in New Issue