skip corrupt message

This commit is contained in:
Slim Bouguerra 2016-03-03 15:15:49 -06:00
parent 703dc7a48f
commit 623e89aa54
2 changed files with 18 additions and 4 deletions

View File

@ -295,6 +295,8 @@ Otherwise, you can run real-time nodes without replication.
There is now also an [experimental low level Kafka firehose](../development/kafka-simple-consumer-firehose.html) which
solves the issues described above with using the high level Kafka consumer.
Please note that druid will skip over event that failed its checksum and it is corrupt.
### Locking
Using stream pull ingestion with Realtime nodes together batch ingestion may introduce data override issues. For example, if you

View File

@ -34,6 +34,7 @@ import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.InvalidMessageException;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -111,13 +112,24 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
@Override
public InputRow nextRow()
{
final byte[] message = iter.next().message();
try {
final byte[] message = iter.next().message();
if (message == null) {
if (message == null) {
return null;
}
return theParser.parse(ByteBuffer.wrap(message));
}
catch (InvalidMessageException e) {
/*
IF the CRC is caused within the wire transfer, this is not the best way to handel CRC.
Probably it is better to shutdown the fireHose without commit and start it again.
*/
log.error(e,"Message failed its checksum and it is corrupt, will skip it");
return null;
}
return theParser.parse(ByteBuffer.wrap(message));
}
@Override