KafkaIndexTask: Treat null values as unparseable. (#3453)

This commit is contained in:
Gian Merlino 2016-09-13 10:56:38 -07:00 committed by Fangjin Yang
parent ba6ddf307e
commit bcff08826b
2 changed files with 11 additions and 5 deletions

View File

@ -403,7 +403,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
} }
try { try {
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); final byte[] valueBytes = record.value();
if (valueBytes == null) {
throw new ParseException("null value");
}
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(valueBytes)), "row");
if (!ioConfig.getMinimumMessageTime().isPresent() || if (!ioConfig.getMinimumMessageTime().isPresent() ||
!ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) { !ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) {

View File

@ -173,6 +173,7 @@ public class KafkaIndexTaskTest
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2011", "d", "y", 1.0f)), new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2011", "d", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2011", "e", "y", 1.0f)), new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2011", "e", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, "unparseable".getBytes()), new ProducerRecord<byte[], byte[]>("topic0", 0, null, "unparseable".getBytes()),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, null),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2013", "f", "y", 1.0f)), new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2013", "f", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2012", "g", "y", 1.0f)), new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2012", "g", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2011", "h", "y", 1.0f)) new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2011", "h", "y", 1.0f))
@ -705,7 +706,7 @@ public class KafkaIndexTaskTest
new KafkaIOConfig( new KafkaIOConfig(
"sequence1", "sequence1",
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 8L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -734,7 +735,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway());
// Check published segments & metadata, should all be from the first task // Check published segments & metadata, should all be from the first task
@ -772,7 +773,7 @@ public class KafkaIndexTaskTest
new KafkaIOConfig( new KafkaIOConfig(
"sequence1", "sequence1",
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 8L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
false, false,
false, false,
@ -807,7 +808,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway());
// Check published segments & metadata // Check published segments & metadata