fix kafka input format reader schema discovery and partial schema discovery (#14421)

* fix kafka input format reader schema discovery and partial schema discovery to actually work right, by re-using dimension filtering logic of MapInputRowParser
This commit is contained in:
Clint Wylie 2023-06-15 00:11:04 -07:00 committed by GitHub
parent ca116cf886
commit ff5ae4db6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 473 additions and 181 deletions

View File

@ -20,7 +20,6 @@
package org.apache.druid.data.input.kafkainput; package org.apache.druid.data.input.kafkainput;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -114,18 +113,6 @@ public class KafkaInputReader implements InputEntityReader
} }
} }
private List<String> getFinalDimensionList(Set<String> newDimensions)
{
final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
if (!schemaDimensions.isEmpty()) {
return schemaDimensions;
} else {
return Lists.newArrayList(
Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
);
}
}
private Map<String, Object> extractHeader(KafkaRecordEntity record) private Map<String, Object> extractHeader(KafkaRecordEntity record)
{ {
final Map<String, Object> mergedHeaderMap = new HashMap<>(); final Map<String, Object> mergedHeaderMap = new HashMap<>();
@ -200,7 +187,11 @@ public class KafkaInputReader implements InputEntityReader
final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event); final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event);
return new MapBasedInputRow( return new MapBasedInputRow(
timestamp, timestamp,
getFinalDimensionList(newDimensions), MapInputRowParser.findDimensions(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
newDimensions
),
event event
); );
} }
@ -272,7 +263,11 @@ public class KafkaInputReader implements InputEntityReader
newInputRows.add( newInputRows.add(
new MapBasedInputRow( new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(event), inputRowSchema.getTimestampSpec().extractTimestamp(event),
getFinalDimensionList(newDimensions), MapInputRowParser.findDimensions(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
newDimensions
),
event event
) )
); );
@ -288,7 +283,11 @@ public class KafkaInputReader implements InputEntityReader
return Collections.singletonList( return Collections.singletonList(
new MapBasedInputRow( new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
getFinalDimensionList(headerKeyList.keySet()), MapInputRowParser.findDimensions(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
headerKeyList.keySet()
),
headerKeyList headerKeyList
) )
); );

View File

@ -53,35 +53,42 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Optional;
public class KafkaInputFormatTest public class KafkaInputFormatTest
{ {
private KafkaRecordEntity inputEntity; private KafkaRecordEntity inputEntity;
private long timestamp = DateTimes.of("2021-06-24").getMillis(); private final long timestamp = DateTimes.of("2021-06-24").getMillis();
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(new Header() { private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(
new Header()
{
@Override @Override
public String key() public String key()
{ {
return "encoding"; return "encoding";
} }
@Override @Override
public byte[] value() public byte[] value()
{ {
return "application/json".getBytes(StandardCharsets.UTF_8); return "application/json".getBytes(StandardCharsets.UTF_8);
} }
}, },
new Header() { new Header()
{
@Override @Override
public String key() public String key()
{ {
return "kafkapkc"; return "kafkapkc";
} }
@Override @Override
public byte[] value() public byte[] value()
{ {
return "pkc-bar".getBytes(StandardCharsets.UTF_8); return "pkc-bar".getBytes(StandardCharsets.UTF_8);
} }
}); }
);
private KafkaInputFormat format; private KafkaInputFormat format;
@Before @Before
@ -92,8 +99,11 @@ public class KafkaInputFormatTest
// Key Format // Key Format
new JsonInputFormat( new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()), new JSONPathSpec(true, ImmutableList.of()),
null, null, false, //make sure JsonReader is used null,
false, false null,
false, //make sure JsonReader is used
false,
false
), ),
// Value Format // Value Format
new JsonInputFormat( new JsonInputFormat(
@ -108,10 +118,15 @@ public class KafkaInputFormatTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
) )
), ),
null, null, false, //make sure JsonReader is used null,
false, false null,
false, //make sure JsonReader is used
false,
false
), ),
"kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" "kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp"
); );
} }
@ -124,8 +139,11 @@ public class KafkaInputFormatTest
// Key Format // Key Format
new JsonInputFormat( new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()), new JSONPathSpec(true, ImmutableList.of()),
null, null, false, //make sure JsonReader is used null,
false, false null,
false, //make sure JsonReader is used
false,
false
), ),
// Value Format // Value Format
new JsonInputFormat( new JsonInputFormat(
@ -140,16 +158,21 @@ public class KafkaInputFormatTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
) )
), ),
null, null, false, //make sure JsonReader is used null,
false, false null,
false, //make sure JsonReader is used
false,
false
), ),
"kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" "kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp"
); );
Assert.assertEquals(format, kif); Assert.assertEquals(format, kif);
final byte[] formatBytes = mapper.writeValueAsBytes(format); final byte[] formatBytes = mapper.writeValueAsBytes(format);
final byte[] kifBytes = mapper.writeValueAsBytes(kif); final byte[] kifBytes = mapper.writeValueAsBytes(kif);
Assert.assertTrue(Arrays.equals(formatBytes, kifBytes)); Assert.assertArrayEquals(formatBytes, kifBytes);
} }
@Test @Test
@ -158,7 +181,8 @@ public class KafkaInputFormatTest
final byte[] key = StringUtils.toUtf8( final byte[] key = StringUtils.toUtf8(
"{\n" "{\n"
+ " \"key\": \"sampleKey\"\n" + " \"key\": \"sampleKey\"\n"
+ "}"); + "}"
);
final byte[] payload = StringUtils.toUtf8( final byte[] payload = StringUtils.toUtf8(
"{\n" "{\n"
@ -169,23 +193,26 @@ public class KafkaInputFormatTest
+ " \"o\": {\n" + " \"o\": {\n"
+ " \"mg\": 1\n" + " \"mg\": 1\n"
+ " }\n" + " }\n"
+ "}"); + "}"
);
Headers headers = new RecordHeaders(SAMPLE_HEADERS); Headers headers = new RecordHeaders(SAMPLE_HEADERS);
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>( inputEntity = makeInputEntity(key, payload, headers);
"sample", 0, 0, timestamp,
null, null, 0, 0,
key, payload, headers));
final InputEntityReader reader = format.createReader( final InputEntityReader reader = format.createReader(
new InputRowSchema( new InputRowSchema(
new TimestampSpec("timestamp", "iso", null), new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( new DimensionsSpec(
"bar", "foo", DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kafka.newheader.encoding", "kafka.newheader.encoding",
"kafka.newheader.kafkapkc", "kafka.newheader.kafkapkc",
"kafka.newts.timestamp" "kafka.newts.timestamp"
))), )
)
),
ColumnsFilter.all() ColumnsFilter.all()
), ),
newSettableByteEntity(inputEntity), newSettableByteEntity(inputEntity),
@ -198,8 +225,20 @@ public class KafkaInputFormatTest
while (iterator.hasNext()) { while (iterator.hasNext()) {
final InputRow row = iterator.next(); final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
"bar",
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
),
row.getDimensions()
);
// Payload verifications // Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp()); Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
@ -211,10 +250,14 @@ public class KafkaInputFormatTest
// Header verification // Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc")));
Assert.assertEquals(String.valueOf(DateTimes.of("2021-06-24").getMillis()), Assert.assertEquals(
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))); String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Assert.assertEquals("2021-06-25", Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
Iterables.getOnlyElement(row.getDimension("timestamp"))); );
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
);
// Key verification // Key verification
Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key")));
@ -243,23 +286,26 @@ public class KafkaInputFormatTest
+ " \"o\": {\n" + " \"o\": {\n"
+ " \"mg\": 1\n" + " \"mg\": 1\n"
+ " }\n" + " }\n"
+ "}"); + "}"
);
Headers headers = new RecordHeaders(SAMPLE_HEADERS); Headers headers = new RecordHeaders(SAMPLE_HEADERS);
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>( inputEntity = makeInputEntity(null, payload, headers);
"sample", 0, 0, timestamp,
null, null, 0, 0,
null, payload, headers));
final InputEntityReader reader = format.createReader( final InputEntityReader reader = format.createReader(
new InputRowSchema( new InputRowSchema(
new TimestampSpec("timestamp", "iso", null), new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( new DimensionsSpec(
"bar", "foo", DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kafka.newheader.encoding", "kafka.newheader.encoding",
"kafka.newheader.kafkapkc", "kafka.newheader.kafkapkc",
"kafka.newts.timestamp" "kafka.newts.timestamp"
))), )
)
),
ColumnsFilter.all() ColumnsFilter.all()
), ),
newSettableByteEntity(inputEntity), newSettableByteEntity(inputEntity),
@ -289,23 +335,29 @@ public class KafkaInputFormatTest
Iterable<Header> sample_header_with_ts = Iterables.unmodifiableIterable( Iterable<Header> sample_header_with_ts = Iterables.unmodifiableIterable(
Iterables.concat( Iterables.concat(
SAMPLE_HEADERS, SAMPLE_HEADERS,
ImmutableList.of(new Header() { ImmutableList.of(
new Header()
{
@Override @Override
public String key() public String key()
{ {
return "headerTs"; return "headerTs";
} }
@Override @Override
public byte[] value() public byte[] value()
{ {
return "2021-06-24".getBytes(StandardCharsets.UTF_8); return "2021-06-24".getBytes(StandardCharsets.UTF_8);
} }
} }
))); )
)
);
final byte[] key = StringUtils.toUtf8( final byte[] key = StringUtils.toUtf8(
"{\n" "{\n"
+ " \"key\": \"sampleKey\"\n" + " \"key\": \"sampleKey\"\n"
+ "}"); + "}"
);
final byte[] payload = StringUtils.toUtf8( final byte[] payload = StringUtils.toUtf8(
"{\n" "{\n"
@ -316,22 +368,25 @@ public class KafkaInputFormatTest
+ " \"o\": {\n" + " \"o\": {\n"
+ " \"mg\": 1\n" + " \"mg\": 1\n"
+ " }\n" + " }\n"
+ "}"); + "}"
);
Headers headers = new RecordHeaders(sample_header_with_ts); Headers headers = new RecordHeaders(sample_header_with_ts);
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>( inputEntity = makeInputEntity(key, payload, headers);
"sample", 0, 0, timestamp,
null, null, 0, 0,
key, payload, headers));
final InputEntityReader reader = format.createReader( final InputEntityReader reader = format.createReader(
new InputRowSchema( new InputRowSchema(
new TimestampSpec("kafka.newheader.headerTs", "iso", null), new TimestampSpec("kafka.newheader.headerTs", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( new DimensionsSpec(
"bar", "foo", DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kafka.newheader.encoding", "kafka.newheader.encoding",
"kafka.newheader.kafkapkc" "kafka.newheader.kafkapkc"
))), )
)
),
ColumnsFilter.all() ColumnsFilter.all()
), ),
newSettableByteEntity(inputEntity), newSettableByteEntity(inputEntity),
@ -345,6 +400,9 @@ public class KafkaInputFormatTest
final InputRow row = iterator.next(); final InputRow row = iterator.next();
// Payload verifications // Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of("2021-06-24"), row.getTimestamp()); Assert.assertEquals(DateTimes.of("2021-06-24"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
@ -356,12 +414,18 @@ public class KafkaInputFormatTest
// Header verification // Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc")));
Assert.assertEquals(String.valueOf(DateTimes.of("2021-06-24").getMillis()), Assert.assertEquals(
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))); String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Assert.assertEquals("2021-06-24", Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
Iterables.getOnlyElement(row.getDimension("kafka.newheader.headerTs"))); );
Assert.assertEquals("2021-06-24", Assert.assertEquals(
Iterables.getOnlyElement(row.getDimension("timestamp"))); "2021-06-24",
Iterables.getOnlyElement(row.getDimension("kafka.newheader.headerTs"))
);
Assert.assertEquals(
"2021-06-24",
Iterables.getOnlyElement(row.getDimension("timestamp"))
);
// Key verification // Key verification
Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key")));
@ -389,13 +453,11 @@ public class KafkaInputFormatTest
+ " \"o\": {\n" + " \"o\": {\n"
+ " \"mg\": 1\n" + " \"mg\": 1\n"
+ " }\n" + " }\n"
+ "}"); + "}"
);
Headers headers = new RecordHeaders(SAMPLE_HEADERS); Headers headers = new RecordHeaders(SAMPLE_HEADERS);
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>( inputEntity = makeInputEntity(null, payload, headers);
"sample", 0, 0, timestamp,
null, null, 0, 0,
null, payload, headers));
KafkaInputFormat localFormat = new KafkaInputFormat( KafkaInputFormat localFormat = new KafkaInputFormat(
null, null,
@ -422,10 +484,15 @@ public class KafkaInputFormatTest
final InputEntityReader reader = localFormat.createReader( final InputEntityReader reader = localFormat.createReader(
new InputRowSchema( new InputRowSchema(
new TimestampSpec("timestamp", "iso", null), new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( new DimensionsSpec(
"bar", "foo", DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kafka.newts.timestamp" "kafka.newts.timestamp"
))), )
)
),
ColumnsFilter.all() ColumnsFilter.all()
), ),
newSettableByteEntity(inputEntity), newSettableByteEntity(inputEntity),
@ -440,6 +507,8 @@ public class KafkaInputFormatTest
final InputRow row = iterator.next(); final InputRow row = iterator.next();
// Key verification // Key verification
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertTrue(row.getDimension("kafka.newkey.key").isEmpty()); Assert.assertTrue(row.getDimension("kafka.newkey.key").isEmpty());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
@ -465,7 +534,8 @@ public class KafkaInputFormatTest
keys[i] = StringUtils.toUtf8( keys[i] = StringUtils.toUtf8(
"{\n" "{\n"
+ " \"key\": \"sampleKey-" + i + "\"\n" + " \"key\": \"sampleKey-" + i + "\"\n"
+ "}"); + "}"
);
} }
keys[2] = null; keys[2] = null;
@ -480,7 +550,8 @@ public class KafkaInputFormatTest
+ " \"o\": {\n" + " \"o\": {\n"
+ " \"mg\": 1\n" + " \"mg\": 1\n"
+ " }\n" + " }\n"
+ "}"); + "}"
);
} }
Headers headers = new RecordHeaders(SAMPLE_HEADERS); Headers headers = new RecordHeaders(SAMPLE_HEADERS);
@ -489,12 +560,17 @@ public class KafkaInputFormatTest
final InputEntityReader reader = format.createReader( final InputEntityReader reader = format.createReader(
new InputRowSchema( new InputRowSchema(
new TimestampSpec("timestamp", "iso", null), new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( new DimensionsSpec(
"bar", "foo", DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kafka.newheader.encoding", "kafka.newheader.encoding",
"kafka.newheader.kafkapkc", "kafka.newheader.kafkapkc",
"kafka.newts.timestamp" "kafka.newts.timestamp"
))), )
)
),
ColumnsFilter.all() ColumnsFilter.all()
), ),
settableByteEntity, settableByteEntity,
@ -504,10 +580,7 @@ public class KafkaInputFormatTest
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
headers = headers.add(new RecordHeader("indexH", String.valueOf(i).getBytes(StandardCharsets.UTF_8))); headers = headers.add(new RecordHeader("indexH", String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
inputEntity = new KafkaRecordEntity(new ConsumerRecord<>( inputEntity = makeInputEntity(keys[i], values[i], headers);
"sample", 0, 0, timestamp,
null, null, 0, 0,
keys[i], values[i], headers));
settableByteEntity.setEntity(inputEntity); settableByteEntity.setEntity(inputEntity);
final int numExpectedIterations = 1; final int numExpectedIterations = 1;
@ -518,6 +591,8 @@ public class KafkaInputFormatTest
final InputRow row = iterator.next(); final InputRow row = iterator.next();
// Payload verification // Payload verification
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of("2021-06-2" + i), row.getTimestamp()); Assert.assertEquals(DateTimes.of("2021-06-2" + i), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
@ -529,10 +604,15 @@ public class KafkaInputFormatTest
// Header verification // Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); Assert.assertEquals(
"application/json",
Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))
);
Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc")));
Assert.assertEquals(String.valueOf(DateTimes.of("2021-06-24").getMillis()), Assert.assertEquals(
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))); String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH"))); Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH")));
@ -561,7 +641,8 @@ public class KafkaInputFormatTest
final byte[] key = StringUtils.toUtf8( final byte[] key = StringUtils.toUtf8(
"{\n" "{\n"
+ " \"key\": \"sampleKey\"\n" + " \"key\": \"sampleKey\"\n"
+ "}"); + "}"
);
final byte[] payload = StringUtils.toUtf8( final byte[] payload = StringUtils.toUtf8(
"{\n" "{\n"
@ -572,34 +653,26 @@ public class KafkaInputFormatTest
+ " \"o\": {\n" + " \"o\": {\n"
+ " \"mg\": 1\n" + " \"mg\": 1\n"
+ " }\n" + " }\n"
+ "}"); + "}"
);
Headers headers = new RecordHeaders(SAMPLE_HEADERS); Headers headers = new RecordHeaders(SAMPLE_HEADERS);
inputEntity = new KafkaRecordEntity( inputEntity = makeInputEntity(key, payload, headers);
new ConsumerRecord<>(
"sample",
0,
0,
timestamp,
null,
null,
0,
0,
key,
payload,
headers
)
);
final InputEntityReader reader = format.createReader( final InputEntityReader reader = format.createReader(
new InputRowSchema( new InputRowSchema(
new TimestampSpec("time", "iso", null), new TimestampSpec("time", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( new DimensionsSpec(
"bar", "foo", DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kafka.newheader.encoding", "kafka.newheader.encoding",
"kafka.newheader.kafkapkc", "kafka.newheader.kafkapkc",
"kafka.newts.timestamp" "kafka.newts.timestamp"
))), )
)
),
ColumnsFilter.all() ColumnsFilter.all()
), ),
newSettableByteEntity(inputEntity), newSettableByteEntity(inputEntity),
@ -617,6 +690,221 @@ public class KafkaInputFormatTest
} }
} }
@Test
public void testWithSchemaDiscovery() throws IOException
{
// testWithHeaderKeyAndValue + schemaless
final byte[] key = StringUtils.toUtf8(
"{\n"
+ " \"key\": \"sampleKey\"\n"
+ "}"
);
final byte[] payload = StringUtils.toUtf8(
"{\n"
+ " \"timestamp\": \"2021-06-25\",\n"
+ " \"bar\": null,\n"
+ " \"foo\": \"x\",\n"
+ " \"baz\": 4,\n"
+ " \"o\": {\n"
+ " \"mg\": 1\n"
+ " }\n"
+ "}"
);
Headers headers = new RecordHeaders(SAMPLE_HEADERS);
inputEntity = makeInputEntity(key, payload, headers);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
"root_baz",
"o",
"bar",
"kafka.newheader.kafkapkc",
"path_omg",
"jq_omg",
"jq_omg2",
"baz",
"root_baz2",
"kafka.newheader.encoding",
"path_omg2"
),
row.getDimensions()
);
// Payload verifications
Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
// Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc")));
Assert.assertEquals(
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
);
// Key verification
Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key")));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testWithPartialDeclarationSchemaDiscovery() throws IOException
{
// testWithHeaderKeyAndValue + partial-schema + schema discovery
final byte[] key = StringUtils.toUtf8(
"{\n"
+ " \"key\": \"sampleKey\"\n"
+ "}"
);
final byte[] payload = StringUtils.toUtf8(
"{\n"
+ " \"timestamp\": \"2021-06-25\",\n"
+ " \"bar\": null,\n"
+ " \"foo\": \"x\",\n"
+ " \"baz\": 4,\n"
+ " \"o\": {\n"
+ " \"mg\": 1\n"
+ " }\n"
+ "}"
);
Headers headers = new RecordHeaders(SAMPLE_HEADERS);
inputEntity = makeInputEntity(key, payload, headers);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
DimensionsSpec.builder().setDimensions(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "kafka.newheader.kafkapkc"))
).useSchemaDiscovery(true).build(),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
"bar",
"kafka.newheader.kafkapkc",
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
"root_baz",
"o",
"path_omg",
"jq_omg",
"jq_omg2",
"baz",
"root_baz2",
"kafka.newheader.encoding",
"path_omg2"
),
row.getDimensions()
);
// Payload verifications
Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
// Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc")));
Assert.assertEquals(
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
);
// Key verification
Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key")));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
private KafkaRecordEntity makeInputEntity(byte[] key, byte[] payload, Headers headers)
{
return new KafkaRecordEntity(
new ConsumerRecord<>(
"sample",
0,
0,
timestamp,
null,
0,
0,
key,
payload,
headers,
Optional.empty()
)
);
}
private SettableByteEntity<KafkaRecordEntity> newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity) private SettableByteEntity<KafkaRecordEntity> newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity)
{ {
SettableByteEntity<KafkaRecordEntity> settableByteEntity = new SettableByteEntity<>(); SettableByteEntity<KafkaRecordEntity> settableByteEntity = new SettableByteEntity<>();

View File

@ -32,6 +32,7 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -66,6 +67,23 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return parse(inputRowSchema.getTimestampSpec(), inputRowSchema.getDimensionsSpec(), theMap); return parse(inputRowSchema.getTimestampSpec(), inputRowSchema.getDimensionsSpec(), theMap);
} }
@VisibleForTesting
static InputRow parse(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
Map<String, Object> theMap
) throws ParseException
{
final List<String> dimensionsToUse = findDimensions(
timestampSpec,
dimensionsSpec,
theMap == null ? Collections.emptySet() : theMap.keySet()
);
final DateTime timestamp = parseTimestamp(timestampSpec, theMap);
return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
}
/** /**
* Finds the final set of dimension names to use for {@link InputRow}. * Finds the final set of dimension names to use for {@link InputRow}.
* There are 3 cases here. * There are 3 cases here.
@ -80,17 +98,17 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
* In any case, the returned list does not include any dimensions in {@link DimensionsSpec#getDimensionExclusions()} * In any case, the returned list does not include any dimensions in {@link DimensionsSpec#getDimensionExclusions()}
* or {@link TimestampSpec#getTimestampColumn()}. * or {@link TimestampSpec#getTimestampColumn()}.
*/ */
private static List<String> findDimensions( public static List<String> findDimensions(
TimestampSpec timestampSpec, TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec, DimensionsSpec dimensionsSpec,
Map<String, Object> rawInputRow Set<String> fields
) )
{ {
final String timestampColumn = timestampSpec.getTimestampColumn(); final String timestampColumn = timestampSpec.getTimestampColumn();
final Set<String> exclusions = dimensionsSpec.getDimensionExclusions(); final Set<String> exclusions = dimensionsSpec.getDimensionExclusions();
if (dimensionsSpec.isIncludeAllDimensions() || dimensionsSpec.useSchemaDiscovery()) { if (dimensionsSpec.isIncludeAllDimensions() || dimensionsSpec.useSchemaDiscovery()) {
LinkedHashSet<String> dimensions = new LinkedHashSet<>(dimensionsSpec.getDimensionNames()); LinkedHashSet<String> dimensions = new LinkedHashSet<>(dimensionsSpec.getDimensionNames());
for (String field : rawInputRow.keySet()) { for (String field : fields) {
if (timestampColumn.equals(field) || exclusions.contains(field)) { if (timestampColumn.equals(field) || exclusions.contains(field)) {
continue; continue;
} }
@ -102,7 +120,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return dimensionsSpec.getDimensionNames(); return dimensionsSpec.getDimensionNames();
} else { } else {
List<String> dimensions = new ArrayList<>(); List<String> dimensions = new ArrayList<>();
for (String field : rawInputRow.keySet()) { for (String field : fields) {
if (timestampColumn.equals(field) || exclusions.contains(field)) { if (timestampColumn.equals(field) || exclusions.contains(field)) {
continue; continue;
} }
@ -113,19 +131,6 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
} }
} }
@VisibleForTesting
static InputRow parse(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
Map<String, Object> theMap
) throws ParseException
{
final List<String> dimensionsToUse = findDimensions(timestampSpec, dimensionsSpec, theMap);
final DateTime timestamp = parseTimestamp(timestampSpec, theMap);
return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
}
public static DateTime parseTimestamp(TimestampSpec timestampSpec, Map<String, Object> theMap) public static DateTime parseTimestamp(TimestampSpec timestampSpec, Map<String, Object> theMap)
{ {
final DateTime timestamp; final DateTime timestamp;