From ff5ae4db6c4330b164ad3550c7c9c976699e4ff5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 15 Jun 2023 00:11:04 -0700 Subject: [PATCH] fix kafka input format reader schema discovery and partial schema discovery (#14421) * fix kafka input format reader schema discovery and partial schema discovery to actually work right, by re-using dimension filtering logic of MapInputRowParser --- .../input/kafkainput/KafkaInputReader.java | 33 +- .../kafkainput/KafkaInputFormatTest.java | 582 +++++++++++++----- .../data/input/impl/MapInputRowParser.java | 39 +- 3 files changed, 473 insertions(+), 181 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 0f3cefb4c37..6d43a2e96fe 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -20,7 +20,6 @@ package org.apache.druid.data.input.kafkainput; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; @@ -114,18 +113,6 @@ public class KafkaInputReader implements InputEntityReader } } - private List getFinalDimensionList(Set newDimensions) - { - final List schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); - if (!schemaDimensions.isEmpty()) { - return schemaDimensions; - } else { - return Lists.newArrayList( - Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions()) - ); - } - } - private Map extractHeader(KafkaRecordEntity record) { final Map mergedHeaderMap = new HashMap<>(); @@ -200,7 +187,11 @@ public class KafkaInputReader implements InputEntityReader final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event); return new MapBasedInputRow( timestamp, - getFinalDimensionList(newDimensions), + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + newDimensions + ), event ); } @@ -272,7 +263,11 @@ public class KafkaInputReader implements InputEntityReader newInputRows.add( new MapBasedInputRow( inputRowSchema.getTimestampSpec().extractTimestamp(event), - getFinalDimensionList(newDimensions), + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + newDimensions + ), event ) ); @@ -282,13 +277,17 @@ public class KafkaInputReader implements InputEntityReader } ); } - + private List buildInputRowsForMap(Map headerKeyList) { return Collections.singletonList( new MapBasedInputRow( inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), - getFinalDimensionList(headerKeyList.keySet()), + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + headerKeyList.keySet() + ), headerKeyList ) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index fc33852d54d..f65f335df9a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -53,35 +53,42 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.Optional; public class KafkaInputFormatTest { private KafkaRecordEntity inputEntity; - private long timestamp = DateTimes.of("2021-06-24").getMillis(); - private static final Iterable
SAMPLE_HEADERS = ImmutableList.of(new Header() { - @Override - public String key() + private final long timestamp = DateTimes.of("2021-06-24").getMillis(); + private static final Iterable
SAMPLE_HEADERS = ImmutableList.of( + new Header() { - return "encoding"; - } - @Override - public byte[] value() + @Override + public String key() + { + return "encoding"; + } + + @Override + public byte[] value() + { + return "application/json".getBytes(StandardCharsets.UTF_8); + } + }, + new Header() { - return "application/json".getBytes(StandardCharsets.UTF_8); + @Override + public String key() + { + return "kafkapkc"; + } + + @Override + public byte[] value() + { + return "pkc-bar".getBytes(StandardCharsets.UTF_8); + } } - }, - new Header() { - @Override - public String key() - { - return "kafkapkc"; - } - @Override - public byte[] value() - { - return "pkc-bar".getBytes(StandardCharsets.UTF_8); - } - }); + ); private KafkaInputFormat format; @Before @@ -92,8 +99,11 @@ public class KafkaInputFormatTest // Key Format new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), - null, null, false, //make sure JsonReader is used - false, false + null, + null, + false, //make sure JsonReader is used + false, + false ), // Value Format new JsonInputFormat( @@ -108,10 +118,15 @@ public class KafkaInputFormatTest new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false, //make sure JsonReader is used - false, false + null, + null, + false, //make sure JsonReader is used + false, + false ), - "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" + "kafka.newheader.", + "kafka.newkey.key", + "kafka.newts.timestamp" ); } @@ -124,8 +139,11 @@ public class KafkaInputFormatTest // Key Format new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), - null, null, false, //make sure JsonReader is used - false, false + null, + null, + false, //make sure JsonReader is used + false, + false ), // Value Format new JsonInputFormat( @@ -140,16 +158,21 @@ public class KafkaInputFormatTest new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false, //make sure JsonReader is used - false, false + null, + null, + false, //make sure JsonReader is used + false, + false ), - "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" + "kafka.newheader.", + "kafka.newkey.key", + "kafka.newts.timestamp" ); Assert.assertEquals(format, kif); final byte[] formatBytes = mapper.writeValueAsBytes(format); final byte[] kifBytes = mapper.writeValueAsBytes(kif); - Assert.assertTrue(Arrays.equals(formatBytes, kifBytes)); + Assert.assertArrayEquals(formatBytes, kifBytes); } @Test @@ -158,7 +181,8 @@ public class KafkaInputFormatTest final byte[] key = StringUtils.toUtf8( "{\n" + " \"key\": \"sampleKey\"\n" - + "}"); + + "}" + ); final byte[] payload = StringUtils.toUtf8( "{\n" @@ -169,23 +193,26 @@ public class KafkaInputFormatTest + " \"o\": {\n" + " \"mg\": 1\n" + " }\n" - + "}"); + + "}" + ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = new KafkaRecordEntity(new ConsumerRecord( - "sample", 0, 0, timestamp, - null, null, 0, 0, - key, payload, headers)); + inputEntity = makeInputEntity(key, payload, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( - "bar", "foo", - "kafka.newheader.encoding", - "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" - ))), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp" + ) + ) + ), ColumnsFilter.all() ), newSettableByteEntity(inputEntity), @@ -198,8 +225,20 @@ public class KafkaInputFormatTest while (iterator.hasNext()) { final InputRow row = iterator.next(); - + Assert.assertEquals( + Arrays.asList( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp" + ), + row.getDimensions() + ); // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp()); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); @@ -211,10 +250,14 @@ public class KafkaInputFormatTest // Header verification Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals(String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))); - Assert.assertEquals("2021-06-25", - Iterables.getOnlyElement(row.getDimension("timestamp"))); + Assert.assertEquals( + String.valueOf(DateTimes.of("2021-06-24").getMillis()), + Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) + ); + Assert.assertEquals( + "2021-06-25", + Iterables.getOnlyElement(row.getDimension("timestamp")) + ); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -243,23 +286,26 @@ public class KafkaInputFormatTest + " \"o\": {\n" + " \"mg\": 1\n" + " }\n" - + "}"); + + "}" + ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = new KafkaRecordEntity(new ConsumerRecord( - "sample", 0, 0, timestamp, - null, null, 0, 0, - null, payload, headers)); + inputEntity = makeInputEntity(null, payload, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( - "bar", "foo", - "kafka.newheader.encoding", - "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" - ))), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp" + ) + ) + ), ColumnsFilter.all() ), newSettableByteEntity(inputEntity), @@ -289,23 +335,29 @@ public class KafkaInputFormatTest Iterable
sample_header_with_ts = Iterables.unmodifiableIterable( Iterables.concat( SAMPLE_HEADERS, - ImmutableList.of(new Header() { - @Override - public String key() - { - return "headerTs"; - } - @Override - public byte[] value() - { - return "2021-06-24".getBytes(StandardCharsets.UTF_8); - } - } - ))); + ImmutableList.of( + new Header() + { + @Override + public String key() + { + return "headerTs"; + } + + @Override + public byte[] value() + { + return "2021-06-24".getBytes(StandardCharsets.UTF_8); + } + } + ) + ) + ); final byte[] key = StringUtils.toUtf8( "{\n" + " \"key\": \"sampleKey\"\n" - + "}"); + + "}" + ); final byte[] payload = StringUtils.toUtf8( "{\n" @@ -316,22 +368,25 @@ public class KafkaInputFormatTest + " \"o\": {\n" + " \"mg\": 1\n" + " }\n" - + "}"); + + "}" + ); Headers headers = new RecordHeaders(sample_header_with_ts); - inputEntity = new KafkaRecordEntity(new ConsumerRecord( - "sample", 0, 0, timestamp, - null, null, 0, 0, - key, payload, headers)); + inputEntity = makeInputEntity(key, payload, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("kafka.newheader.headerTs", "iso", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( - "bar", "foo", - "kafka.newheader.encoding", - "kafka.newheader.kafkapkc" - ))), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc" + ) + ) + ), ColumnsFilter.all() ), newSettableByteEntity(inputEntity), @@ -345,6 +400,9 @@ public class KafkaInputFormatTest final InputRow row = iterator.next(); // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + Assert.assertEquals(DateTimes.of("2021-06-24"), row.getTimestamp()); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); @@ -356,12 +414,18 @@ public class KafkaInputFormatTest // Header verification Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals(String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))); - Assert.assertEquals("2021-06-24", - Iterables.getOnlyElement(row.getDimension("kafka.newheader.headerTs"))); - Assert.assertEquals("2021-06-24", - Iterables.getOnlyElement(row.getDimension("timestamp"))); + Assert.assertEquals( + String.valueOf(DateTimes.of("2021-06-24").getMillis()), + Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) + ); + Assert.assertEquals( + "2021-06-24", + Iterables.getOnlyElement(row.getDimension("kafka.newheader.headerTs")) + ); + Assert.assertEquals( + "2021-06-24", + Iterables.getOnlyElement(row.getDimension("timestamp")) + ); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -389,13 +453,11 @@ public class KafkaInputFormatTest + " \"o\": {\n" + " \"mg\": 1\n" + " }\n" - + "}"); + + "}" + ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = new KafkaRecordEntity(new ConsumerRecord( - "sample", 0, 0, timestamp, - null, null, 0, 0, - null, payload, headers)); + inputEntity = makeInputEntity(null, payload, headers); KafkaInputFormat localFormat = new KafkaInputFormat( null, @@ -422,10 +484,15 @@ public class KafkaInputFormatTest final InputEntityReader reader = localFormat.createReader( new InputRowSchema( new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( - "bar", "foo", - "kafka.newts.timestamp" - ))), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newts.timestamp" + ) + ) + ), ColumnsFilter.all() ), newSettableByteEntity(inputEntity), @@ -440,6 +507,8 @@ public class KafkaInputFormatTest final InputRow row = iterator.next(); // Key verification + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal Assert.assertTrue(row.getDimension("kafka.newkey.key").isEmpty()); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); @@ -464,23 +533,25 @@ public class KafkaInputFormatTest for (int i = 0; i < keys.length; i++) { keys[i] = StringUtils.toUtf8( "{\n" - + " \"key\": \"sampleKey-" + i + "\"\n" - + "}"); + + " \"key\": \"sampleKey-" + i + "\"\n" + + "}" + ); } keys[2] = null; for (int i = 0; i < values.length; i++) { values[i] = StringUtils.toUtf8( "{\n" - + " \"timestamp\": \"2021-06-2" + i + "\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"index\": " + i + ",\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}"); + + " \"timestamp\": \"2021-06-2" + i + "\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"index\": " + i + ",\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}" + ); } Headers headers = new RecordHeaders(SAMPLE_HEADERS); @@ -489,12 +560,17 @@ public class KafkaInputFormatTest final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( - "bar", "foo", - "kafka.newheader.encoding", - "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" - ))), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp" + ) + ) + ), ColumnsFilter.all() ), settableByteEntity, @@ -504,10 +580,7 @@ public class KafkaInputFormatTest for (int i = 0; i < keys.length; i++) { headers = headers.add(new RecordHeader("indexH", String.valueOf(i).getBytes(StandardCharsets.UTF_8))); - inputEntity = new KafkaRecordEntity(new ConsumerRecord<>( - "sample", 0, 0, timestamp, - null, null, 0, 0, - keys[i], values[i], headers)); + inputEntity = makeInputEntity(keys[i], values[i], headers); settableByteEntity.setEntity(inputEntity); final int numExpectedIterations = 1; @@ -518,6 +591,8 @@ public class KafkaInputFormatTest final InputRow row = iterator.next(); // Payload verification + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal Assert.assertEquals(DateTimes.of("2021-06-2" + i), row.getTimestamp()); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); @@ -529,10 +604,15 @@ public class KafkaInputFormatTest // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); + Assert.assertEquals( + "application/json", + Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")) + ); Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals(String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))); + Assert.assertEquals( + String.valueOf(DateTimes.of("2021-06-24").getMillis()), + Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) + ); Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH"))); @@ -561,7 +641,8 @@ public class KafkaInputFormatTest final byte[] key = StringUtils.toUtf8( "{\n" + " \"key\": \"sampleKey\"\n" - + "}"); + + "}" + ); final byte[] payload = StringUtils.toUtf8( "{\n" @@ -572,34 +653,26 @@ public class KafkaInputFormatTest + " \"o\": {\n" + " \"mg\": 1\n" + " }\n" - + "}"); + + "}" + ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = new KafkaRecordEntity( - new ConsumerRecord<>( - "sample", - 0, - 0, - timestamp, - null, - null, - 0, - 0, - key, - payload, - headers - ) - ); + inputEntity = makeInputEntity(key, payload, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("time", "iso", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( - "bar", "foo", - "kafka.newheader.encoding", - "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" - ))), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp" + ) + ) + ), ColumnsFilter.all() ), newSettableByteEntity(inputEntity), @@ -617,6 +690,221 @@ public class KafkaInputFormatTest } } + @Test + public void testWithSchemaDiscovery() throws IOException + { + // testWithHeaderKeyAndValue + schemaless + final byte[] key = StringUtils.toUtf8( + "{\n" + + " \"key\": \"sampleKey\"\n" + + "}" + ); + + final byte[] payload = StringUtils.toUtf8( + "{\n" + + " \"timestamp\": \"2021-06-25\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}" + ); + + Headers headers = new RecordHeaders(SAMPLE_HEADERS); + inputEntity = makeInputEntity(key, payload, headers); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + Assert.assertEquals( + Arrays.asList( + "foo", + "kafka.newts.timestamp", + "kafka.newkey.key", + "root_baz", + "o", + "bar", + "kafka.newheader.kafkapkc", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "kafka.newheader.encoding", + "path_omg2" + ), + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + // Header verification + Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); + Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); + Assert.assertEquals( + String.valueOf(DateTimes.of("2021-06-24").getMillis()), + Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) + ); + Assert.assertEquals( + "2021-06-25", + Iterables.getOnlyElement(row.getDimension("timestamp")) + ); + + // Key verification + Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithPartialDeclarationSchemaDiscovery() throws IOException + { + // testWithHeaderKeyAndValue + partial-schema + schema discovery + final byte[] key = StringUtils.toUtf8( + "{\n" + + " \"key\": \"sampleKey\"\n" + + "}" + ); + + final byte[] payload = StringUtils.toUtf8( + "{\n" + + " \"timestamp\": \"2021-06-25\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}" + ); + + Headers headers = new RecordHeaders(SAMPLE_HEADERS); + inputEntity = makeInputEntity(key, payload, headers); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + DimensionsSpec.builder().setDimensions( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "kafka.newheader.kafkapkc")) + ).useSchemaDiscovery(true).build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + Assert.assertEquals( + Arrays.asList( + "bar", + "kafka.newheader.kafkapkc", + "foo", + "kafka.newts.timestamp", + "kafka.newkey.key", + "root_baz", + "o", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "kafka.newheader.encoding", + "path_omg2" + ), + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + // Header verification + Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); + Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); + Assert.assertEquals( + String.valueOf(DateTimes.of("2021-06-24").getMillis()), + Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) + ); + Assert.assertEquals( + "2021-06-25", + Iterables.getOnlyElement(row.getDimension("timestamp")) + ); + + // Key verification + Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + private KafkaRecordEntity makeInputEntity(byte[] key, byte[] payload, Headers headers) + { + return new KafkaRecordEntity( + new ConsumerRecord<>( + "sample", + 0, + 0, + timestamp, + null, + 0, + 0, + key, + payload, + headers, + Optional.empty() + ) + ); + } + + private SettableByteEntity newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity) { SettableByteEntity settableByteEntity = new SettableByteEntity<>(); diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java index ca2dddecfd9..fcd68741f1a 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java @@ -32,6 +32,7 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -66,6 +67,23 @@ public class MapInputRowParser implements InputRowParser> return parse(inputRowSchema.getTimestampSpec(), inputRowSchema.getDimensionsSpec(), theMap); } + @VisibleForTesting + static InputRow parse( + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec, + Map theMap + ) throws ParseException + { + final List dimensionsToUse = findDimensions( + timestampSpec, + dimensionsSpec, + theMap == null ? Collections.emptySet() : theMap.keySet() + ); + + final DateTime timestamp = parseTimestamp(timestampSpec, theMap); + return new MapBasedInputRow(timestamp, dimensionsToUse, theMap); + } + /** * Finds the final set of dimension names to use for {@link InputRow}. * There are 3 cases here. @@ -80,17 +98,17 @@ public class MapInputRowParser implements InputRowParser> * In any case, the returned list does not include any dimensions in {@link DimensionsSpec#getDimensionExclusions()} * or {@link TimestampSpec#getTimestampColumn()}. */ - private static List findDimensions( + public static List findDimensions( TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec, - Map rawInputRow + Set fields ) { final String timestampColumn = timestampSpec.getTimestampColumn(); final Set exclusions = dimensionsSpec.getDimensionExclusions(); if (dimensionsSpec.isIncludeAllDimensions() || dimensionsSpec.useSchemaDiscovery()) { LinkedHashSet dimensions = new LinkedHashSet<>(dimensionsSpec.getDimensionNames()); - for (String field : rawInputRow.keySet()) { + for (String field : fields) { if (timestampColumn.equals(field) || exclusions.contains(field)) { continue; } @@ -102,7 +120,7 @@ public class MapInputRowParser implements InputRowParser> return dimensionsSpec.getDimensionNames(); } else { List dimensions = new ArrayList<>(); - for (String field : rawInputRow.keySet()) { + for (String field : fields) { if (timestampColumn.equals(field) || exclusions.contains(field)) { continue; } @@ -113,19 +131,6 @@ public class MapInputRowParser implements InputRowParser> } } - @VisibleForTesting - static InputRow parse( - TimestampSpec timestampSpec, - DimensionsSpec dimensionsSpec, - Map theMap - ) throws ParseException - { - final List dimensionsToUse = findDimensions(timestampSpec, dimensionsSpec, theMap); - - final DateTime timestamp = parseTimestamp(timestampSpec, theMap); - return new MapBasedInputRow(timestamp, dimensionsToUse, theMap); - } - public static DateTime parseTimestamp(TimestampSpec timestampSpec, Map theMap) { final DateTime timestamp;