KafkaInputFormat: Fix handling of CSV/TSV keyFormat. (#17226)

* KafkaInputFormat: Fix handling of CSV/TSV keyFormat.

Follow-up to #16630, which fixed a similar issue for the valueFormat.

* Simplify.
This commit is contained in:
Gian Merlino 2024-10-03 13:05:09 -07:00 committed by GitHub
parent db7cc4634c
commit fc00664760
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 126 additions and 16 deletions

View File

@ -21,11 +21,13 @@ package org.apache.druid.data.input.kafkainput;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
@ -33,7 +35,6 @@ import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.DateTimes;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Objects;
@ -111,7 +112,12 @@ public class KafkaInputFormat implements InputFormat
(record.getRecord().key() == null) ?
null :
JsonInputFormat.withLineSplittable(keyFormat, false).createReader(
newInputRowSchema,
// for keys, discover all fields; in KafkaInputReader we will pick the first one.
new InputRowSchema(
dummyTimestampSpec,
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
ColumnsFilter.all()
),
new ByteEntity(record.getRecord().key()),
temporaryDirectory
),

View File

@ -58,14 +58,13 @@ public class KafkaInputReader implements InputEntityReader
private final String topicColumnName;
/**
*
* @param inputRowSchema Actual schema from the ingestion spec
* @param source kafka record containing header, key & value that is wrapped inside SettableByteEntity
* @param inputRowSchema Actual schema from the ingestion spec
* @param source kafka record containing header, key & value that is wrapped inside SettableByteEntity
* @param headerParserSupplier Function to get Header parser for parsing the header section, kafkaInputFormat allows users to skip header parsing section and hence an be null
* @param keyParserSupplier Function to get Key parser for key section, can be null as well. Key parser supplier can also return a null key parser.
* @param valueParser Value parser is a required section in kafkaInputFormat. It cannot be null.
* @param keyColumnName Default key column name
* @param timestampColumnName Default kafka record's timestamp column name
* @param keyParserSupplier Function to get Key parser for key section, can be null as well. Key parser supplier can also return a null key parser.
* @param valueParser Value parser is a required section in kafkaInputFormat. It cannot be null.
* @param keyColumnName Default key column name
* @param timestampColumnName Default kafka record's timestamp column name
*/
public KafkaInputReader(
InputRowSchema inputRowSchema,
@ -144,14 +143,9 @@ public class KafkaInputReader implements InputEntityReader
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
if (keyIterator.hasNext()) {
// Return type for the key parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
MapBasedInputRow keyRow = (MapBasedInputRow) keyIterator.next();
final InputRow keyRow = keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergedHeaderMap.putIfAbsent(
keyColumnName,
keyRow.getEvent().entrySet().stream().findFirst().get().getValue()
);
mergedHeaderMap.computeIfAbsent(keyColumnName, ignored -> getFirstValue(keyRow));
}
}
catch (ClassCastException e) {
@ -344,4 +338,15 @@ public class KafkaInputReader implements InputEntityReader
}
};
}
/**
* Get the first value from an {@link InputRow}. This is the first element from {@link InputRow#getDimensions()}
* if there are any. If there are not any, returns null. This method is used to extract keys.
*/
@Nullable
static Object getFirstValue(final InputRow row)
{
final List<String> dimensions = row.getDimensions();
return !dimensions.isEmpty() ? row.getRaw(dimensions.get(0)) : null;
}
}

View File

@ -693,6 +693,105 @@ public class KafkaInputFormatTest
}
}
@Test
public void testKeyInCsvFormat() throws IOException
{
format = new KafkaInputFormat(
new KafkaStringHeaderFormat(null),
// Key Format
new CsvInputFormat(
// name of the field doesn't matter, it just has to be something
Collections.singletonList("foo"),
null,
false,
false,
0,
null
),
// Value Format
new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
null,
null,
false,
false
),
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp",
"kafka.newtopic.topic"
);
Headers headers = new RecordHeaders(SAMPLE_HEADERS);
KafkaRecordEntity inputEntity =
makeInputEntity(
// x,y,z are ignored; key will be "sampleKey"
StringUtils.toUtf8("sampleKey,x,y,z"),
SIMPLE_JSON_VALUE_BYTES,
headers
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kafka.newkey.key",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
"bar",
"foo",
"kafka.newkey.key",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp",
"kafka.newtopic.topic"
),
row.getDimensions()
);
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertTrue(row.getDimension("bar").isEmpty());
verifyHeader(row);
// Key verification
Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key")));
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testValueInCsvFormat() throws IOException
{