From af202d7576f626561c22d50f6611e2885630f7f3 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Tue, 17 Dec 2013 15:27:17 +0100 Subject: [PATCH] improve kafka intake stability --- .../firehose/kafka/KafkaEightFirehoseFactory.java | 10 +++++++++- .../firehose/kafka/KafkaSevenFirehoseFactory.java | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index c8bf876cc92..86c165c04d8 100644 --- a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -107,7 +107,15 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory return null; } - return parser.parse(ByteBuffer.wrap(message)); + try { + return parser.parse(ByteBuffer.wrap(message)); + } + catch (Exception e) { + throw new FormattedException.Builder() + .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW) + .withMessage(String.format("Error parsing[%s], got [%s]", ByteBuffer.wrap(message), e.toString())) + .build(); + } } @Override diff --git a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java index 8f200d1cdbc..c227b323877 100644 --- a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java +++ b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java @@ -120,7 +120,15 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory public InputRow parseMessage(Message message) throws FormattedException { - return parser.parse(message.payload()); + try { + return parser.parse(message.payload()); + } + catch (Exception e) { + throw new FormattedException.Builder() + .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW) + .withMessage(String.format("Error parsing[%s], got [%s]", message.payload(), e.toString())) + .build(); + } } @Override