fix Kafka input format to throw ParseException if timestamp is missing (#14413)

This commit is contained in:
Clint Wylie 2023-06-13 09:00:11 -07:00 committed by GitHub
parent 66c3cc1391
commit 61120dc49a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 2 deletions

View File

@ -26,12 +26,14 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity; import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
@ -195,8 +197,9 @@ public class KafkaInputReader implements InputEntityReader
// Remove the dummy timestamp added in KafkaInputFormat // Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event);
return new MapBasedInputRow( return new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(event), timestamp,
getFinalDimensionList(newDimensions), getFinalDimensionList(newDimensions),
event event
); );

View File

@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
@ -554,6 +555,68 @@ public class KafkaInputFormatTest
} }
} }
@Test
public void testMissingTimestampThrowsException() throws IOException
{
final byte[] key = StringUtils.toUtf8(
"{\n"
+ " \"key\": \"sampleKey\"\n"
+ "}");
final byte[] payload = StringUtils.toUtf8(
"{\n"
+ " \"timestamp\": \"2021-06-25\",\n"
+ " \"bar\": null,\n"
+ " \"foo\": \"x\",\n"
+ " \"baz\": 4,\n"
+ " \"o\": {\n"
+ " \"mg\": 1\n"
+ " }\n"
+ "}");
Headers headers = new RecordHeaders(SAMPLE_HEADERS);
inputEntity = new KafkaRecordEntity(
new ConsumerRecord<>(
"sample",
0,
0,
timestamp,
null,
null,
0,
0,
key,
payload,
headers
)
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("time", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"bar", "foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
))),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next());
Assert.assertEquals(
"Timestamp[null] is unparseable! Event: {foo=x, kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, bar=null, kafka...",
t.getMessage()
);
}
}
}
private SettableByteEntity<KafkaRecordEntity> newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity) private SettableByteEntity<KafkaRecordEntity> newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity)
{ {
SettableByteEntity<KafkaRecordEntity> settableByteEntity = new SettableByteEntity<>(); SettableByteEntity<KafkaRecordEntity> settableByteEntity = new SettableByteEntity<>();

View File

@ -122,6 +122,12 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{ {
final List<String> dimensionsToUse = findDimensions(timestampSpec, dimensionsSpec, theMap); final List<String> dimensionsToUse = findDimensions(timestampSpec, dimensionsSpec, theMap);
final DateTime timestamp = parseTimestamp(timestampSpec, theMap);
return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
}
public static DateTime parseTimestamp(TimestampSpec timestampSpec, Map<String, Object> theMap)
{
final DateTime timestamp; final DateTime timestamp;
try { try {
timestamp = timestampSpec.extractTimestamp(theMap); timestamp = timestampSpec.extractTimestamp(theMap);
@ -154,7 +160,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
rawMap rawMap
); );
} }
return new MapBasedInputRow(timestamp, dimensionsToUse, theMap); return timestamp;
} }
@Nullable @Nullable