fix KafkaInputFormat with nested columns by delegating to underlying inputRow map instead of eagerly copying (#13406)

This commit is contained in:
Clint Wylie 2022-11-28 12:28:07 -08:00 committed by GitHub
parent a2d5e335f3
commit 4b58f5f23c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 60 deletions

View File

@ -30,24 +30,24 @@ import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public class KafkaInputReader implements InputEntityReader
{
private static final Logger log = new Logger(KafkaInputReader.class);
private final InputRowSchema inputRowSchema;
private final SettableByteEntity<KafkaRecordEntity> source;
private final Function<KafkaRecordEntity, KafkaHeaderReader> headerParserSupplier;
@ -85,7 +85,60 @@ public class KafkaInputReader implements InputEntityReader
this.timestampColumnName = timestampColumnName;
}
private List<String> getFinalDimensionList(HashSet<String> newDimensions)
@Override
public CloseableIterator<InputRow> read() throws IOException
{
final KafkaRecordEntity record = source.getEntity();
final Map<String, Object> mergedHeaderMap = new HashMap<>();
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
List<Pair<String, Object>> headerList = headerParser.read();
for (Pair<String, Object> ele : headerList) {
mergedHeaderMap.put(ele.lhs, ele.rhs);
}
}
// Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());
InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
if (keyIterator.hasNext()) {
// Return type for the key parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
MapBasedInputRow keyRow = (MapBasedInputRow) keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergedHeaderMap.putIfAbsent(
keyColumnName,
keyRow.getEvent().entrySet().stream().findFirst().get().getValue()
);
}
}
catch (ClassCastException e) {
throw new IOException(
"Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows"
);
}
}
// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergedHeaderMap);
} else {
return buildRowsWithoutValuePayload(mergedHeaderMap);
}
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}
private List<String> getFinalDimensionList(Set<String> newDimensions)
{
final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
if (!schemaDimensions.isEmpty()) {
@ -97,11 +150,14 @@ public class KafkaInputReader implements InputEntityReader
}
}
private CloseableIterator<InputRow> buildBlendedRows(InputEntityReader valueParser, Map<String, Object> headerKeyList) throws IOException
private CloseableIterator<InputRow> buildBlendedRows(
InputEntityReader valueParser,
Map<String, Object> headerKeyList
) throws IOException
{
return valueParser.read().map(
r -> {
MapBasedInputRow valueRow;
final MapBasedInputRow valueRow;
try {
// Return type for the value parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
@ -113,14 +169,9 @@ public class KafkaInputReader implements InputEntityReader
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
);
}
Map<String, Object> event = new HashMap<>(headerKeyList);
/* Currently we prefer payload attributes if there is a collision in names.
We can change this beahvior in later changes with a config knob. This default
behavior lets easy porting of existing inputFormats to the new one without any changes.
*/
event.putAll(valueRow.getEvent());
HashSet<String> newDimensions = new HashSet<String>(valueRow.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
@ -136,60 +187,70 @@ public class KafkaInputReader implements InputEntityReader
private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> headerKeyList)
{
HashSet<String> newDimensions = new HashSet<String>(headerKeyList.keySet());
InputRow row = new MapBasedInputRow(
final InputRow row = new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
getFinalDimensionList(newDimensions),
getFinalDimensionList(headerKeyList.keySet()),
headerKeyList
);
List<InputRow> rows = Collections.singletonList(row);
final List<InputRow> rows = Collections.singletonList(row);
return CloseableIterators.withEmptyBaggage(rows.iterator());
}
@Override
public CloseableIterator<InputRow> read() throws IOException
/**
* Builds a map that blends two {@link Map}, presenting the combined keyset of both maps, and preferring to read
* from the first map and falling back to the second map if the value is not present.
*
* This strategy is used rather than just copying the values of the keyset into a new map so that any 'flattening'
* machinery (such as {@link Map} created by {@link org.apache.druid.java.util.common.parsers.ObjectFlatteners}) is
* still in place to be lazily evaluated instead of eagerly copying.
*/
private static Map<String, Object> buildBlendedEventMap(Map<String, Object> map, Map<String, Object> fallback)
{
KafkaRecordEntity record = source.getEntity();
Map<String, Object> mergeMap = new HashMap<>();
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
List<Pair<String, Object>> headerList = headerParser.read();
for (Pair<String, Object> ele : headerList) {
mergeMap.put(ele.lhs, ele.rhs);
final Set<String> keySet = new HashSet<>(fallback.keySet());
keySet.addAll(map.keySet());
return new AbstractMap<String, Object>()
{
@Override
public Object get(Object key)
{
return map.getOrDefault((String) key, fallback.get(key));
}
}
// Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in the header list
mergeMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());
InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
if (keyIterator.hasNext()) {
// Return type for the key parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
MapBasedInputRow keyRow = (MapBasedInputRow) keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergeMap.putIfAbsent(keyColumnName, keyRow.getEvent().entrySet().stream().findFirst().get().getValue());
}
@Override
public Set<String> keySet()
{
return keySet;
}
catch (ClassCastException e) {
throw new IOException("Unsupported input format in keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows");
@Override
public Set<Entry<String, Object>> entrySet()
{
return keySet().stream()
.map(
field -> new Entry<String, Object>()
{
@Override
public String getKey()
{
return field;
}
@Override
public Object getValue()
{
return get(field);
}
@Override
public Object setValue(final Object value)
{
throw new UnsupportedOperationException();
}
}
)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
}
// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergeMap);
} else {
return buildRowsWithoutValuePayload(mergeMap);
}
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
};
}
}

View File

@ -22,12 +22,12 @@ package org.apache.druid.data.input.kafkainput;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
@ -205,6 +205,7 @@ public class KafkaInputFormatTest
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
// Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
@ -342,7 +343,6 @@ public class KafkaInputFormatTest
while (iterator.hasNext()) {
final InputRow row = iterator.next();
final MapBasedInputRow mrow = (MapBasedInputRow) row;
// Payload verifications
Assert.assertEquals(DateTimes.of("2021-06-24"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
@ -350,6 +350,7 @@ public class KafkaInputFormatTest
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
// Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
@ -444,6 +445,7 @@ public class KafkaInputFormatTest
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
numActualIterations++;
}
@ -521,6 +523,7 @@ public class KafkaInputFormatTest
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index")));