Reuse the InputEntityReader in SettableByteEntityReader (#12269)

* Reuse the InputEntityReader in SettableByteEntityReader

* Fix logic

* Fix kafka streaming ingestion

* Add Tests for kafka input format change

* Address review comments
This commit is contained in:
Abhishek Agarwal 2022-03-10 04:08:31 +05:30 committed by GitHub
parent 9cfb23935f
commit 6346b9561d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 299 additions and 61 deletions

View File

@ -28,9 +28,11 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.DateTimes;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Objects;
@ -79,28 +81,29 @@ public class KafkaInputFormat implements InputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
KafkaRecordEntity record = (KafkaRecordEntity) source;
SettableByteEntity<KafkaRecordEntity> settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
InputRowSchema newInputRowSchema = new InputRowSchema(dummyTimestampSpec, inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter());
return new KafkaInputReader(
inputRowSchema,
record,
settableByteEntitySource,
(headerFormat == null) ?
null :
headerFormat.createReader(record.getRecord().headers(), headerColumnPrefix),
(keyFormat == null || record.getRecord().key() == null) ?
null :
keyFormat.createReader(
newInputRowSchema,
new ByteEntity(record.getRecord().key()),
temporaryDirectory
),
(record.getRecord().value() == null) ?
null :
valueFormat.createReader(
newInputRowSchema,
source,
temporaryDirectory
),
null :
record -> headerFormat.createReader(record.getRecord().headers(), headerColumnPrefix),
(keyFormat == null) ?
null :
record ->
(record.getRecord().key() == null) ?
null :
keyFormat.createReader(
newInputRowSchema,
new ByteEntity(record.getRecord().key()),
temporaryDirectory
),
valueFormat.createReader(
newInputRowSchema,
source,
temporaryDirectory
),
keyColumnName,
timestampColumnName
);

View File

@ -27,27 +27,31 @@ import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class KafkaInputReader implements InputEntityReader
{
private static final Logger log = new Logger(KafkaInputReader.class);
private final InputRowSchema inputRowSchema;
private final KafkaRecordEntity record;
private final KafkaHeaderReader headerParser;
private final InputEntityReader keyParser;
private final SettableByteEntity<KafkaRecordEntity> source;
private final Function<KafkaRecordEntity, KafkaHeaderReader> headerParserSupplier;
private final Function<KafkaRecordEntity, InputEntityReader> keyParserSupplier;
private final InputEntityReader valueParser;
private final String keyColumnName;
private final String timestampColumnName;
@ -55,27 +59,27 @@ public class KafkaInputReader implements InputEntityReader
/**
*
* @param inputRowSchema Actual schema from the ingestion spec
* @param record kafka record containing header, key & value
* @param headerParser Header parser for parsing the header section, kafkaInputFormat allows users to skip header parsing section and hence an be null
* @param keyParser Key parser for key section, can be null as well
* @param valueParser Value parser is a required section in kafkaInputFormat, but because of tombstone records we can have a null parser here.
* @param source kafka record containing header, key & value that is wrapped inside SettableByteEntity
* @param headerParserSupplier Function to get Header parser for parsing the header section, kafkaInputFormat allows users to skip header parsing section and hence an be null
* @param keyParserSupplier Function to get Key parser for key section, can be null as well. Key parser supplier can also return a null key parser.
* @param valueParser Value parser is a required section in kafkaInputFormat. It cannot be null.
* @param keyColumnName Default key column name
* @param timestampColumnName Default kafka record's timestamp column name
*/
public KafkaInputReader(
InputRowSchema inputRowSchema,
KafkaRecordEntity record,
KafkaHeaderReader headerParser,
InputEntityReader keyParser,
SettableByteEntity<KafkaRecordEntity> source,
@Nullable Function<KafkaRecordEntity, KafkaHeaderReader> headerParserSupplier,
@Nullable Function<KafkaRecordEntity, InputEntityReader> keyParserSupplier,
InputEntityReader valueParser,
String keyColumnName,
String timestampColumnName
)
{
this.inputRowSchema = inputRowSchema;
this.record = record;
this.headerParser = headerParser;
this.keyParser = keyParser;
this.source = source;
this.headerParserSupplier = headerParserSupplier;
this.keyParserSupplier = keyParserSupplier;
this.valueParser = valueParser;
this.keyColumnName = keyColumnName;
this.timestampColumnName = timestampColumnName;
@ -145,8 +149,10 @@ public class KafkaInputReader implements InputEntityReader
@Override
public CloseableIterator<InputRow> read() throws IOException
{
KafkaRecordEntity record = source.getEntity();
Map<String, Object> mergeMap = new HashMap<>();
if (headerParser != null) {
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
List<Pair<String, Object>> headerList = headerParser.read();
for (Pair<String, Object> ele : headerList) {
mergeMap.put(ele.lhs, ele.rhs);
@ -156,6 +162,7 @@ public class KafkaInputReader implements InputEntityReader
// Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in the header list
mergeMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());
InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
@ -172,7 +179,8 @@ public class KafkaInputReader implements InputEntityReader
}
}
if (valueParser != null) {
// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergeMap);
} else {
return buildRowsWithoutValuePayload(mergeMap);

View File

@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -41,6 +42,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Assert;
import org.junit.Before;
@ -49,6 +51,7 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
public class KafkaInputFormatTest
{
@ -180,7 +183,7 @@ public class KafkaInputFormatTest
))),
ColumnsFilter.all()
),
inputEntity,
newSettableByteEntity(inputEntity),
null
);
@ -253,7 +256,7 @@ public class KafkaInputFormatTest
))),
ColumnsFilter.all()
),
inputEntity,
newSettableByteEntity(inputEntity),
null
);
@ -325,7 +328,7 @@ public class KafkaInputFormatTest
))),
ColumnsFilter.all()
),
inputEntity,
newSettableByteEntity(inputEntity),
null
);
@ -418,7 +421,7 @@ public class KafkaInputFormatTest
))),
ColumnsFilter.all()
),
inputEntity,
newSettableByteEntity(inputEntity),
null
);
@ -443,4 +446,110 @@ public class KafkaInputFormatTest
}
}
@Test
public void testWithMultipleMixedRecords() throws IOException
{
final byte[][] keys = new byte[5][];
final byte[][] values = new byte[5][];
for (int i = 0; i < keys.length; i++) {
keys[i] = StringUtils.toUtf8(
"{\n"
+ " \"key\": \"sampleKey-" + i + "\"\n"
+ "}");
}
keys[2] = null;
for (int i = 0; i < values.length; i++) {
values[i] = StringUtils.toUtf8(
"{\n"
+ " \"timestamp\": \"2021-06-2" + i + "\",\n"
+ " \"bar\": null,\n"
+ " \"foo\": \"x\",\n"
+ " \"baz\": 4,\n"
+ " \"index\": " + i + ",\n"
+ " \"o\": {\n"
+ " \"mg\": 1\n"
+ " }\n"
+ "}");
}
Headers headers = new RecordHeaders(SAMPLE_HEADERS);
SettableByteEntity<KafkaRecordEntity> settableByteEntity = new SettableByteEntity<>();
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"bar", "foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
))),
ColumnsFilter.all()
),
settableByteEntity,
null
);
for (int i = 0; i < keys.length; i++) {
headers = headers.add(new RecordHeader("indexH", String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
inputEntity = new KafkaRecordEntity(new ConsumerRecord<>(
"sample", 0, 0, timestamp,
null, null, 0, 0,
keys[i], values[i], headers));
settableByteEntity.setEntity(inputEntity);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
// Payload verification
Assert.assertEquals(DateTimes.of("2021-06-2" + i), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index")));
// Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc")));
Assert.assertEquals(String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")));
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH")));
// Key verification
if (i == 2) {
Assert.assertEquals(Collections.emptyList(), row.getDimension("kafka.newkey.key"));
} else {
Assert.assertEquals("sampleKey-" + i, Iterables.getOnlyElement(row.getDimension("kafka.newkey.key")));
}
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
}
private SettableByteEntity<KafkaRecordEntity> newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity)
{
SettableByteEntity<KafkaRecordEntity> settableByteEntity = new SettableByteEntity<>();
settableByteEntity.setEntity(kafkaRecordEntity);
return settableByteEntity;
}
}

View File

@ -87,6 +87,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTestBase;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
@ -3124,13 +3125,14 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
final KafkaRecordEntity recordEntity = (KafkaRecordEntity) source;
final InputEntityReader delegate = baseInputFormat.createReader(inputRowSchema, recordEntity, temporaryDirectory);
final SettableByteEntity<KafkaRecordEntity> settableByteEntity = (SettableByteEntity<KafkaRecordEntity>) source;
final InputEntityReader delegate = baseInputFormat.createReader(inputRowSchema, source, temporaryDirectory);
return new InputEntityReader()
{
@Override
public CloseableIterator<InputRow> read() throws IOException
{
KafkaRecordEntity recordEntity = (KafkaRecordEntity) settableByteEntity.getEntity();
return delegate.read().map(
r -> {
MapBasedInputRow row = (MapBasedInputRow) r;

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.io.ByteBufferInputStream;
import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
/**
* This class is only to be used with {@link SettableByteEntityReader} and {code KafkaInputFormat}. It is useful for stream
* processing where binary records are arriving as a list but {@link org.apache.druid.data.input.InputEntityReader}, that
* parses the data, expects an {@link InputStream}. This class mimics a continuous InputStream while behind the scenes,
* binary records are being put one after the other that the InputStream consumes bytes from. One record is fully
* consumed and only then the next record is set. This class doesn't allow reading the same data twice.
* This class solely exists to overcome the limitations imposed by interfaces for reading and parsing data.
*
*/
@NotThreadSafe
public class SettableByteEntity<T extends ByteEntity> implements InputEntity
{
private final SettableByteBufferInputStream inputStream;
private boolean opened = false;
private T entity;
public SettableByteEntity()
{
this.inputStream = new SettableByteBufferInputStream();
}
public void setEntity(T entity)
{
inputStream.setBuffer(entity.getBuffer());
this.entity = entity;
opened = false;
}
@Nullable
@Override
public URI getUri()
{
return null;
}
public T getEntity()
{
return entity;
}
/**
* This method can be called multiple times only for different data. So you can open a new input stream
* only after a new buffer is in use.
*/
@Override
public InputStream open()
{
if (opened) {
throw new IllegalArgumentException("Can't open the input stream on SettableByteEntity more than once");
}
opened = true;
return inputStream;
}
public static final class SettableByteBufferInputStream extends InputStream
{
@Nullable
private ByteBufferInputStream delegate;
public void setBuffer(ByteBuffer newBuffer)
{
if (null != delegate && available() > 0) {
throw new IAE("New data cannot be set in buffer till all the old data has been read");
}
this.delegate = new ByteBufferInputStream(newBuffer);
}
@Override
public int read()
{
Preconditions.checkNotNull(delegate, "Buffer is not set");
return delegate.read();
}
@Override
public int read(byte[] bytes, int off, int len)
{
Preconditions.checkNotNull(delegate, "Buffer is not set");
return delegate.read(bytes, off, len);
}
@Override
public int available()
{
Preconditions.checkNotNull(delegate, "Buffer is not set");
return delegate.available();
}
}
}

View File

@ -29,7 +29,6 @@ import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer;
import org.apache.druid.segment.transform.TransformingInputEntityReader;
import java.io.File;
@ -39,14 +38,10 @@ import java.io.IOException;
* A settable {@link InputEntityReader}. This class is intended to be used for only stream parsing in Kafka or Kinesis
* indexing.
*/
class SettableByteEntityReader implements InputEntityReader
class SettableByteEntityReader<T extends ByteEntity> implements InputEntityReader
{
private final InputFormat inputFormat;
private final InputRowSchema inputRowSchema;
private final Transformer transformer;
private final File indexingTmpDir;
private InputEntityReader delegate;
private final SettableByteEntity<T> entity;
private final InputEntityReader delegate;
SettableByteEntityReader(
InputFormat inputFormat,
@ -55,21 +50,18 @@ class SettableByteEntityReader implements InputEntityReader
File indexingTmpDir
)
{
this.inputFormat = Preconditions.checkNotNull(inputFormat, "inputFormat");
this.inputRowSchema = inputRowSchema;
this.transformer = transformSpec.toTransformer();
this.indexingTmpDir = indexingTmpDir;
Preconditions.checkNotNull(inputFormat, "inputFormat");
final InputFormat format = (inputFormat instanceof JsonInputFormat) ? ((JsonInputFormat) inputFormat).withLineSplittable(false) : inputFormat;
this.entity = new SettableByteEntity<>();
this.delegate = new TransformingInputEntityReader(
format.createReader(inputRowSchema, entity, indexingTmpDir),
transformSpec.toTransformer()
);
}
void setEntity(ByteEntity entity)
void setEntity(T entity)
{
InputFormat format = (inputFormat instanceof JsonInputFormat) ? ((JsonInputFormat) inputFormat).withLineSplittable(false) : inputFormat;
this.delegate = new TransformingInputEntityReader(
// Yes, we are creating a new reader for every stream chunk.
// This should be fine as long as initializing a reader is cheap which it is for now.
format.createReader(inputRowSchema, entity, indexingTmpDir),
transformer
);
this.entity.setEntity(entity);
}
@Override

View File

@ -51,7 +51,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
@Nullable
private final InputRowParser<ByteBuffer> parser;
@Nullable
private final SettableByteEntityReader byteEntityReader;
private final SettableByteEntityReader<RecordType> byteEntityReader;
private final Predicate<InputRow> rowFilter;
private final RowIngestionMeters rowIngestionMeters;
private final ParseExceptionHandler parseExceptionHandler;
@ -76,7 +76,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
// parser is already decorated with transformSpec in DataSchema
this.parser = parser;
if (inputFormat != null) {
this.byteEntityReader = new SettableByteEntityReader(
this.byteEntityReader = new SettableByteEntityReader<>(
inputFormat,
inputRowSchema,
transformSpec,