Merge pull request #212 from metamx/realtime-npe

Rejigger exception handling in standalone realtime
This commit is contained in:
cheddar 2013-08-06 17:39:01 -07:00
commit 02f9cc3c7a
3 changed files with 47 additions and 31 deletions

View File

@ -1,9 +1,10 @@
package com.metamx.druid.indexer.data; package com.metamx.druid.indexer.data;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
public interface InputRowParser<T> public interface InputRowParser<T>
{ {
public InputRow parse(T input); public InputRow parse(T input) throws FormattedException;
public void addDimensionExclusion(String dimension); public void addDimensionExclusion(String dimension);
} }

View File

@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -37,21 +38,30 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
} }
@Override @Override
public InputRow parse(Map<String, Object> theMap) public InputRow parse(Map<String, Object> theMap) throws FormattedException
{ {
final List<String> dimensions = dataSpec.hasCustomDimensions() final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions() ? dataSpec.getDimensions()
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions)); : Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp = timestampSpec.extractTimestamp(theMap); final DateTime timestamp;
if (timestamp == null) { try {
final String input = theMap.toString(); timestamp = timestampSpec.extractTimestamp(theMap);
throw new NullPointerException( if (timestamp == null) {
String.format( final String input = theMap.toString();
"Null timestamp in input: %s", throw new NullPointerException(
input.length() < 100 ? input : input.substring(0, 100) + "..." String.format(
) "Null timestamp in input: %s",
); input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_TIMESTAMP)
.withMessage(e.toString())
.build();
} }
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap); return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);

View File

@ -164,32 +164,37 @@ public class RealtimeManager implements QuerySegmentWalker
final InputRow inputRow; final InputRow inputRow;
try { try {
inputRow = firehose.nextRow(); inputRow = firehose.nextRow();
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
int currCount = sink.add(inputRow);
metrics.incrementProcessed();
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
} }
catch (FormattedException e) { catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails()); log.info(e, "unparseable line: %s", e.getDetails());
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue; continue;
} }
catch (Exception e) {
log.info(e, "thrown away line due to exception");
metrics.incrementThrownAway();
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
int currCount = sink.add(inputRow);
metrics.incrementProcessed();
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource()) log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())