fix KafkaInputFormat when used with Sampler API (#13900)

* fix KafkaInputFormat when used with Sampler API

* handle key format sampling the same as value format sampling
This commit is contained in:
Clint Wylie 2023-03-08 16:23:24 -08:00 committed by GitHub
parent 82f7a56475
commit c7f4bb5056
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 243 additions and 37 deletions

View File

@ -81,7 +81,13 @@ public class KafkaInputFormat implements InputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
SettableByteEntity<KafkaRecordEntity> settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
final SettableByteEntity<KafkaRecordEntity> settableByteEntitySource;
if (source instanceof SettableByteEntity) {
settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
} else {
settableByteEntitySource = new SettableByteEntity<>();
settableByteEntitySource.setEntity((KafkaRecordEntity) source);
}
InputRowSchema newInputRowSchema = new InputRowSchema(
dummyTimestampSpec,
inputRowSchema.getDimensionsSpec(),

View File

@ -89,6 +89,43 @@ public class KafkaInputReader implements InputEntityReader
public CloseableIterator<InputRow> read() throws IOException
{
final KafkaRecordEntity record = source.getEntity();
final Map<String, Object> mergedHeaderMap = extractHeaderAndKeys(record);
// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergedHeaderMap);
} else {
return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
}
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
final KafkaRecordEntity record = source.getEntity();
InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record);
if (record.getRecord().value() != null) {
return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
} else {
final List<InputRowListPlusRawValues> rows = Collections.singletonList(keysAndHeader);
return CloseableIterators.withEmptyBaggage(rows.iterator());
}
}
private List<String> getFinalDimensionList(Set<String> newDimensions)
{
final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
if (!schemaDimensions.isEmpty()) {
return schemaDimensions;
} else {
return Lists.newArrayList(
Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
);
}
}
private Map<String, Object> extractHeader(KafkaRecordEntity record)
{
final Map<String, Object> mergedHeaderMap = new HashMap<>();
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
@ -102,7 +139,13 @@ public class KafkaInputReader implements InputEntityReader
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());
InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
return mergedHeaderMap;
}
private Map<String, Object> extractHeaderAndKeys(KafkaRecordEntity record) throws IOException
{
final Map<String, Object> mergedHeaderMap = extractHeader(record);
final InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
@ -123,31 +166,7 @@ public class KafkaInputReader implements InputEntityReader
);
}
}
// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergedHeaderMap);
} else {
return buildRowsWithoutValuePayload(mergedHeaderMap);
}
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}
private List<String> getFinalDimensionList(Set<String> newDimensions)
{
final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
if (!schemaDimensions.isEmpty()) {
return schemaDimensions;
} else {
return Lists.newArrayList(
Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
);
}
return mergedHeaderMap;
}
private CloseableIterator<InputRow> buildBlendedRows(
@ -185,15 +204,91 @@ public class KafkaInputReader implements InputEntityReader
);
}
private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> headerKeyList)
private InputRowListPlusRawValues extractHeaderAndKeysSample(KafkaRecordEntity record) throws IOException
{
final InputRow row = new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
getFinalDimensionList(headerKeyList.keySet()),
headerKeyList
Map<String, Object> mergedHeaderMap = extractHeader(record);
InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRowListPlusRawValues> keyIterator = keyParser.sample()) {
// Key currently only takes the first row and ignores the rest.
if (keyIterator.hasNext()) {
// Return type for the key parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
InputRowListPlusRawValues keyRow = keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergedHeaderMap.putIfAbsent(
keyColumnName,
keyRow.getRawValues().entrySet().stream().findFirst().get().getValue()
);
return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
}
}
catch (ClassCastException e) {
throw new IOException(
"Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows"
);
}
}
return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
}
private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
InputEntityReader valueParser,
Map<String, Object> headerKeyList
) throws IOException
{
return valueParser.sample().map(
rowAndValues -> {
if (rowAndValues.getParseException() != null) {
return rowAndValues;
}
List<InputRow> newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
List<Map<String, Object>> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size());
ParseException parseException = null;
for (Map<String, Object> raw : rowAndValues.getRawValuesList()) {
newRawRows.add(buildBlendedEventMap(raw, headerKeyList));
}
for (InputRow r : rowAndValues.getInputRows()) {
MapBasedInputRow valueRow = null;
try {
valueRow = (MapBasedInputRow) r;
}
catch (ClassCastException e) {
parseException = new ParseException(
null,
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
);
}
if (valueRow != null) {
final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
newInputRows.add(
new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(event),
getFinalDimensionList(newDimensions),
event
)
);
}
}
return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException);
}
);
}
private List<InputRow> buildInputRowsForMap(Map<String, Object> headerKeyList)
{
return Collections.singletonList(
new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
getFinalDimensionList(headerKeyList.keySet()),
headerKeyList
)
);
final List<InputRow> rows = Collections.singletonList(row);
return CloseableIterators.withEmptyBaggage(rows.iterator());
}
/**

View File

@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.test.TestBroker;
@ -94,6 +95,26 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null
);
private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema(
"test_ds",
new TimestampSpec("kafka.timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
);
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
@ -126,7 +147,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
zkServer.stop();
}
@Test(timeout = 30_000L)
@Test
public void testSample()
{
insertData(generateRecords(TOPIC));
@ -169,7 +190,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null, null, null),
new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);
@ -177,6 +198,90 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
runSamplerAndCompareResponse(samplerSpec, true);
}
@Test
public void testSampleKafkaInputFormat()
{
insertData(generateRecords(TOPIC));
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
null,
DATA_SCHEMA_KAFKA_TIMESTAMP,
null,
new KafkaSupervisorIOConfig(
TOPIC,
new KafkaInputFormat(
null,
null,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null
),
null,
null,
null,
kafkaServer.consumerProperties(),
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);
SamplerResponse response = samplerSpec.sample();
Assert.assertEquals(5, response.getNumRowsRead());
// we can parse an extra row compared to other generated data samples because we are using kafka timestamp
// for timestamp
Assert.assertEquals(4, response.getNumRowsIndexed());
Assert.assertEquals(5, response.getData().size());
Iterator<SamplerResponse.SamplerResponseRow> it = response.getData().iterator();
SamplerResponse.SamplerResponseRow nextRow;
Map<String, Object> rawInput;
Map<String, Object> parsedInput;
for (int i = 0; i < 4; i++) {
nextRow = it.next();
Assert.assertNull(nextRow.isUnparseable());
rawInput = nextRow.getInput();
parsedInput = nextRow.getParsed();
Assert.assertTrue(rawInput.containsKey("kafka.timestamp"));
Assert.assertEquals(rawInput.get("kafka.timestamp"), parsedInput.get("__time"));
}
nextRow = it.next();
Assert.assertTrue(nextRow.isUnparseable());
Assert.assertFalse(it.hasNext());
}
@Test
public void testWithInputRowParser() throws IOException
{
@ -245,7 +350,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null, null, null),
new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(new DefaultObjectMapper()),
OBJECT_MAPPER
);