diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java index f40fd75998c..55c8af27dcd 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java @@ -163,38 +163,40 @@ public class RealtimeManager implements QuerySegmentWalker while (firehose.hasMore()) { final InputRow inputRow; try { - inputRow = firehose.nextRow(); + try { + inputRow = firehose.nextRow(); + } + catch (Exception e) { + log.info(e, "thrown away line due to exception"); + metrics.incrementThrownAway(); + continue; + } + + final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + if (sink == null) { + metrics.incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); + + if (System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + } + + continue; + } + + int currCount = sink.add(inputRow); + metrics.incrementProcessed(); + if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + } } catch (FormattedException e) { log.info(e, "unparseable line: %s", e.getDetails()); metrics.incrementUnparseable(); continue; } - catch (Exception e) { - log.info(e, "thrown away line due to exception"); - metrics.incrementThrownAway(); - continue; - } - - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if (sink == null) { - metrics.incrementThrownAway(); - log.debug("Throwing away event[%s]", inputRow); - - if (System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - - continue; - } - - int currCount = sink.add(inputRow); - metrics.incrementProcessed(); - if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } } } catch (RuntimeException e) { log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())