mirror of
https://github.com/apache/druid.git
synced 2025-02-23 19:15:02 +00:00
Merge pull request #2589 from b-slim/fix_real_time
Make realtime kafka firehose skip corrupt message
This commit is contained in:
commit
2ad134638d
@ -295,6 +295,8 @@ Otherwise, you can run real-time nodes without replication.
|
|||||||
There is now also an [experimental low level Kafka firehose](../development/kafka-simple-consumer-firehose.html) which
|
There is now also an [experimental low level Kafka firehose](../development/kafka-simple-consumer-firehose.html) which
|
||||||
solves the issues described above with using the high level Kafka consumer.
|
solves the issues described above with using the high level Kafka consumer.
|
||||||
|
|
||||||
|
Please note that druid will skip over event that failed its checksum and it is corrupt.
|
||||||
|
|
||||||
### Locking
|
### Locking
|
||||||
|
|
||||||
Using stream pull ingestion with Realtime nodes together batch ingestion may introduce data override issues. For example, if you
|
Using stream pull ingestion with Realtime nodes together batch ingestion may introduce data override issues. For example, if you
|
||||||
|
@ -34,6 +34,7 @@ import kafka.consumer.ConsumerConfig;
|
|||||||
import kafka.consumer.ConsumerIterator;
|
import kafka.consumer.ConsumerIterator;
|
||||||
import kafka.consumer.KafkaStream;
|
import kafka.consumer.KafkaStream;
|
||||||
import kafka.javaapi.consumer.ConsumerConnector;
|
import kafka.javaapi.consumer.ConsumerConnector;
|
||||||
|
import kafka.message.InvalidMessageException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@ -111,6 +112,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
|
|||||||
@Override
|
@Override
|
||||||
public InputRow nextRow()
|
public InputRow nextRow()
|
||||||
{
|
{
|
||||||
|
try {
|
||||||
final byte[] message = iter.next().message();
|
final byte[] message = iter.next().message();
|
||||||
|
|
||||||
if (message == null) {
|
if (message == null) {
|
||||||
@ -118,6 +120,16 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
|
|||||||
}
|
}
|
||||||
|
|
||||||
return theParser.parse(ByteBuffer.wrap(message));
|
return theParser.parse(ByteBuffer.wrap(message));
|
||||||
|
|
||||||
|
}
|
||||||
|
catch (InvalidMessageException e) {
|
||||||
|
/*
|
||||||
|
IF the CRC is caused within the wire transfer, this is not the best way to handel CRC.
|
||||||
|
Probably it is better to shutdown the fireHose without commit and start it again.
|
||||||
|
*/
|
||||||
|
log.error(e,"Message failed its checksum and it is corrupt, will skip it");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user