diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java index 82a3c947bc3..231cbd44102 100644 --- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java @@ -1,9 +1,10 @@ package com.metamx.druid.indexer.data; +import com.metamx.common.exception.FormattedException; import com.metamx.druid.input.InputRow; public interface InputRowParser { - public InputRow parse(T input); + public InputRow parse(T input) throws FormattedException; public void addDimensionExclusion(String dimension); } diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java index 60a97c131bf..b2d9586f272 100644 --- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.metamx.common.exception.FormattedException; import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedInputRow; import org.joda.time.DateTime; @@ -37,21 +38,30 @@ public class MapInputRowParser implements InputRowParser> } @Override - public InputRow parse(Map theMap) + public InputRow parse(Map theMap) throws FormattedException { final List dimensions = dataSpec.hasCustomDimensions() ? dataSpec.getDimensions() : Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions)); - final DateTime timestamp = timestampSpec.extractTimestamp(theMap); - if (timestamp == null) { - final String input = theMap.toString(); - throw new NullPointerException( - String.format( - "Null timestamp in input: %s", - input.length() < 100 ? input : input.substring(0, 100) + "..." - ) - ); + final DateTime timestamp; + try { + timestamp = timestampSpec.extractTimestamp(theMap); + if (timestamp == null) { + final String input = theMap.toString(); + throw new NullPointerException( + String.format( + "Null timestamp in input: %s", + input.length() < 100 ? input : input.substring(0, 100) + "..." + ) + ); + } + } + catch (Exception e) { + throw new FormattedException.Builder() + .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_TIMESTAMP) + .withMessage(e.toString()) + .build(); } return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java index 0874a900f42..f40fd75998c 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java @@ -164,32 +164,37 @@ public class RealtimeManager implements QuerySegmentWalker final InputRow inputRow; try { inputRow = firehose.nextRow(); - - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if (sink == null) { - metrics.incrementThrownAway(); - log.debug("Throwing away event[%s]", inputRow); - - if (System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - - continue; - } - - int currCount = sink.add(inputRow); - metrics.incrementProcessed(); - if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } } catch (FormattedException e) { log.info(e, "unparseable line: %s", e.getDetails()); metrics.incrementUnparseable(); continue; } + catch (Exception e) { + log.info(e, "thrown away line due to exception"); + metrics.incrementThrownAway(); + continue; + } + + final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + if (sink == null) { + metrics.incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); + + if (System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + } + + continue; + } + + int currCount = sink.add(inputRow); + metrics.incrementProcessed(); + if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + } } } catch (RuntimeException e) { log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())