From 623e89aa54a4d836cbc94601d44e1816b150fdb7 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 3 Mar 2016 15:15:49 -0600 Subject: [PATCH] skip corrupt message --- docs/content/ingestion/stream-pull.md | 2 ++ .../kafka/KafkaEightFirehoseFactory.java | 20 +++++++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/docs/content/ingestion/stream-pull.md b/docs/content/ingestion/stream-pull.md index 10cee8dc209..1fed08d11c1 100644 --- a/docs/content/ingestion/stream-pull.md +++ b/docs/content/ingestion/stream-pull.md @@ -295,6 +295,8 @@ Otherwise, you can run real-time nodes without replication. There is now also an [experimental low level Kafka firehose](../development/kafka-simple-consumer-firehose.html) which solves the issues described above with using the high level Kafka consumer. +Please note that druid will skip over event that failed its checksum and it is corrupt. + ### Locking Using stream pull ingestion with Realtime nodes together batch ingestion may introduce data override issues. For example, if you diff --git a/extensions/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 6d84b6c4667..9bd4c5c1d42 100644 --- a/extensions/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/extensions/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -34,6 +34,7 @@ import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.InvalidMessageException; import java.io.IOException; import java.nio.ByteBuffer; @@ -111,13 +112,24 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory