diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 458955e5807..56e94215940 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -30,24 +30,24 @@ import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.seekablestream.SettableByteEntity; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; import javax.annotation.Nullable; - import java.io.IOException; +import java.util.AbstractMap; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; public class KafkaInputReader implements InputEntityReader { - private static final Logger log = new Logger(KafkaInputReader.class); - private final InputRowSchema inputRowSchema; private final SettableByteEntity source; private final Function headerParserSupplier; @@ -85,7 +85,60 @@ public class KafkaInputReader implements InputEntityReader this.timestampColumnName = timestampColumnName; } - private List getFinalDimensionList(HashSet newDimensions) + @Override + public CloseableIterator read() throws IOException + { + final KafkaRecordEntity record = source.getEntity(); + final Map mergedHeaderMap = new HashMap<>(); + if (headerParserSupplier != null) { + KafkaHeaderReader headerParser = headerParserSupplier.apply(record); + List> headerList = headerParser.read(); + for (Pair ele : headerList) { + mergedHeaderMap.put(ele.lhs, ele.rhs); + } + } + + // Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in + // the header list + mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp()); + + InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); + if (keyParser != null) { + try (CloseableIterator keyIterator = keyParser.read()) { + // Key currently only takes the first row and ignores the rest. + if (keyIterator.hasNext()) { + // Return type for the key parser should be of type MapBasedInputRow + // Parsers returning other types are not compatible currently. + MapBasedInputRow keyRow = (MapBasedInputRow) keyIterator.next(); + // Add the key to the mergeList only if the key string is not already present + mergedHeaderMap.putIfAbsent( + keyColumnName, + keyRow.getEvent().entrySet().stream().findFirst().get().getValue() + ); + } + } + catch (ClassCastException e) { + throw new IOException( + "Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows" + ); + } + } + + // Ignore tombstone records that have null values. + if (record.getRecord().value() != null) { + return buildBlendedRows(valueParser, mergedHeaderMap); + } else { + return buildRowsWithoutValuePayload(mergedHeaderMap); + } + } + + @Override + public CloseableIterator sample() throws IOException + { + return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); + } + + private List getFinalDimensionList(Set newDimensions) { final List schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); if (!schemaDimensions.isEmpty()) { @@ -97,11 +150,14 @@ public class KafkaInputReader implements InputEntityReader } } - private CloseableIterator buildBlendedRows(InputEntityReader valueParser, Map headerKeyList) throws IOException + private CloseableIterator buildBlendedRows( + InputEntityReader valueParser, + Map headerKeyList + ) throws IOException { return valueParser.read().map( r -> { - MapBasedInputRow valueRow; + final MapBasedInputRow valueRow; try { // Return type for the value parser should be of type MapBasedInputRow // Parsers returning other types are not compatible currently. @@ -113,14 +169,9 @@ public class KafkaInputReader implements InputEntityReader "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" ); } - Map event = new HashMap<>(headerKeyList); - /* Currently we prefer payload attributes if there is a collision in names. - We can change this beahvior in later changes with a config knob. This default - behavior lets easy porting of existing inputFormats to the new one without any changes. - */ - event.putAll(valueRow.getEvent()); - HashSet newDimensions = new HashSet(valueRow.getDimensions()); + final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); + final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); newDimensions.addAll(headerKeyList.keySet()); // Remove the dummy timestamp added in KafkaInputFormat newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); @@ -136,60 +187,70 @@ public class KafkaInputReader implements InputEntityReader private CloseableIterator buildRowsWithoutValuePayload(Map headerKeyList) { - HashSet newDimensions = new HashSet(headerKeyList.keySet()); - InputRow row = new MapBasedInputRow( + final InputRow row = new MapBasedInputRow( inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), - getFinalDimensionList(newDimensions), + getFinalDimensionList(headerKeyList.keySet()), headerKeyList ); - List rows = Collections.singletonList(row); + final List rows = Collections.singletonList(row); return CloseableIterators.withEmptyBaggage(rows.iterator()); } - @Override - public CloseableIterator read() throws IOException + /** + * Builds a map that blends two {@link Map}, presenting the combined keyset of both maps, and preferring to read + * from the first map and falling back to the second map if the value is not present. + * + * This strategy is used rather than just copying the values of the keyset into a new map so that any 'flattening' + * machinery (such as {@link Map} created by {@link org.apache.druid.java.util.common.parsers.ObjectFlatteners}) is + * still in place to be lazily evaluated instead of eagerly copying. + */ + private static Map buildBlendedEventMap(Map map, Map fallback) { - KafkaRecordEntity record = source.getEntity(); - Map mergeMap = new HashMap<>(); - if (headerParserSupplier != null) { - KafkaHeaderReader headerParser = headerParserSupplier.apply(record); - List> headerList = headerParser.read(); - for (Pair ele : headerList) { - mergeMap.put(ele.lhs, ele.rhs); + final Set keySet = new HashSet<>(fallback.keySet()); + keySet.addAll(map.keySet()); + + return new AbstractMap() + { + @Override + public Object get(Object key) + { + return map.getOrDefault((String) key, fallback.get(key)); } - } - // Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in the header list - mergeMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp()); - - InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); - if (keyParser != null) { - try (CloseableIterator keyIterator = keyParser.read()) { - // Key currently only takes the first row and ignores the rest. - if (keyIterator.hasNext()) { - // Return type for the key parser should be of type MapBasedInputRow - // Parsers returning other types are not compatible currently. - MapBasedInputRow keyRow = (MapBasedInputRow) keyIterator.next(); - // Add the key to the mergeList only if the key string is not already present - mergeMap.putIfAbsent(keyColumnName, keyRow.getEvent().entrySet().stream().findFirst().get().getValue()); - } + @Override + public Set keySet() + { + return keySet; } - catch (ClassCastException e) { - throw new IOException("Unsupported input format in keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows"); + + @Override + public Set> entrySet() + { + return keySet().stream() + .map( + field -> new Entry() + { + @Override + public String getKey() + { + return field; + } + + @Override + public Object getValue() + { + return get(field); + } + + @Override + public Object setValue(final Object value) + { + throw new UnsupportedOperationException(); + } + } + ) + .collect(Collectors.toCollection(LinkedHashSet::new)); } - } - - // Ignore tombstone records that have null values. - if (record.getRecord().value() != null) { - return buildBlendedRows(valueParser, mergeMap); - } else { - return buildRowsWithoutValuePayload(mergeMap); - } - } - - @Override - public CloseableIterator sample() throws IOException - { - return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); + }; } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index fe0b89e996f..a45730005a9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -22,12 +22,12 @@ package org.apache.druid.data.input.kafkainput; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; @@ -205,6 +205,7 @@ public class KafkaInputFormatTest Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); // Header verification Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); @@ -342,7 +343,6 @@ public class KafkaInputFormatTest while (iterator.hasNext()) { final InputRow row = iterator.next(); - final MapBasedInputRow mrow = (MapBasedInputRow) row; // Payload verifications Assert.assertEquals(DateTimes.of("2021-06-24"), row.getTimestamp()); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); @@ -350,6 +350,7 @@ public class KafkaInputFormatTest Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); // Header verification Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); @@ -444,6 +445,7 @@ public class KafkaInputFormatTest Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); numActualIterations++; } @@ -521,6 +523,7 @@ public class KafkaInputFormatTest Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index")));