From bcff08826be53106faa236332b4d5651ea06789e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 13 Sep 2016 10:56:38 -0700 Subject: [PATCH] KafkaIndexTask: Treat null values as unparseable. (#3453) --- .../java/io/druid/indexing/kafka/KafkaIndexTask.java | 7 ++++++- .../java/io/druid/indexing/kafka/KafkaIndexTaskTest.java | 9 +++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 7a18dbe07c0..e595d1160e8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -403,7 +403,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler } try { - final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); + final byte[] valueBytes = record.value(); + if (valueBytes == null) { + throw new ParseException("null value"); + } + + final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(valueBytes)), "row"); if (!ioConfig.getMinimumMessageTime().isPresent() || !ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index bfa66838d2d..1fbf0ebf557 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -173,6 +173,7 @@ public class KafkaIndexTaskTest new ProducerRecord("topic0", 0, null, JB("2011", "d", "y", 1.0f)), new ProducerRecord("topic0", 0, null, JB("2011", "e", "y", 1.0f)), new ProducerRecord("topic0", 0, null, "unparseable".getBytes()), + new ProducerRecord("topic0", 0, null, null), new ProducerRecord("topic0", 0, null, JB("2013", "f", "y", 1.0f)), new ProducerRecord("topic0", 1, null, JB("2012", "g", "y", 1.0f)), new ProducerRecord("topic0", 1, null, JB("2011", "h", "y", 1.0f)) @@ -705,7 +706,7 @@ public class KafkaIndexTaskTest new KafkaIOConfig( "sequence1", new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), - new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 8L)), kafkaServer.consumerProperties(), true, false, @@ -734,7 +735,7 @@ public class KafkaIndexTaskTest Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata, should all be from the first task @@ -772,7 +773,7 @@ public class KafkaIndexTaskTest new KafkaIOConfig( "sequence1", new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), - new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 8L)), kafkaServer.consumerProperties(), false, false, @@ -807,7 +808,7 @@ public class KafkaIndexTaskTest Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata