Merge pull request #332 from liquidm/improve-kafka

improve kafka intake stability
This commit is contained in:
fjy 2013-12-18 15:30:03 -08:00
commit 4a07f02cb5
2 changed files with 18 additions and 2 deletions

View File

@ -107,7 +107,15 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory
return null; return null;
} }
return parser.parse(ByteBuffer.wrap(message)); try {
return parser.parse(ByteBuffer.wrap(message));
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", ByteBuffer.wrap(message), e.toString()))
.build();
}
} }
@Override @Override

View File

@ -120,7 +120,15 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory
public InputRow parseMessage(Message message) throws FormattedException public InputRow parseMessage(Message message) throws FormattedException
{ {
return parser.parse(message.payload()); try {
return parser.parse(message.payload());
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", message.payload(), e.toString()))
.build();
}
} }
@Override @Override