diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 6e6ac8fa10e..0f3cefb4c37 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -26,12 +26,14 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.seekablestream.SettableByteEntity; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; +import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.IOException; @@ -195,8 +197,9 @@ public class KafkaInputReader implements InputEntityReader // Remove the dummy timestamp added in KafkaInputFormat newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); + final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event); return new MapBasedInputRow( - inputRowSchema.getTimestampSpec().extractTimestamp(event), + timestamp, getFinalDimensionList(newDimensions), event ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index a45730005a9..fc33852d54d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; @@ -554,6 +555,68 @@ public class KafkaInputFormatTest } } + @Test + public void testMissingTimestampThrowsException() throws IOException + { + final byte[] key = StringUtils.toUtf8( + "{\n" + + " \"key\": \"sampleKey\"\n" + + "}"); + + final byte[] payload = StringUtils.toUtf8( + "{\n" + + " \"timestamp\": \"2021-06-25\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}"); + + Headers headers = new RecordHeaders(SAMPLE_HEADERS); + inputEntity = new KafkaRecordEntity( + new ConsumerRecord<>( + "sample", + 0, + 0, + timestamp, + null, + null, + 0, + 0, + key, + payload, + headers + ) + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("time", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( + "bar", "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp" + ))), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next()); + Assert.assertEquals( + "Timestamp[null] is unparseable! Event: {foo=x, kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, bar=null, kafka...", + t.getMessage() + ); + } + } + } + private SettableByteEntity newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity) { SettableByteEntity settableByteEntity = new SettableByteEntity<>(); diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java index c069e3e37c5..ca2dddecfd9 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java @@ -122,6 +122,12 @@ public class MapInputRowParser implements InputRowParser> { final List dimensionsToUse = findDimensions(timestampSpec, dimensionsSpec, theMap); + final DateTime timestamp = parseTimestamp(timestampSpec, theMap); + return new MapBasedInputRow(timestamp, dimensionsToUse, theMap); + } + + public static DateTime parseTimestamp(TimestampSpec timestampSpec, Map theMap) + { final DateTime timestamp; try { timestamp = timestampSpec.extractTimestamp(theMap); @@ -154,7 +160,7 @@ public class MapInputRowParser implements InputRowParser> rawMap ); } - return new MapBasedInputRow(timestamp, dimensionsToUse, theMap); + return timestamp; } @Nullable