Avoid conversion to String in JsonReader, JsonNodeReader. (#15693)

* Avoid conversion to String in JsonReader, JsonNodeReader.

These readers were running UTF-8 decode on the provided entity to
convert it to a String, then parsing the String as JSON. The patch
changes them to parse the provided entity's input stream directly.

In order to preserve the nice error messages that include parse errors,
the readers now need to open the entity again on the error path, to
re-read the data. To make this possible, the InputEntity#open contract
is tightened to require the ability to re-open entities, and existing
InputEntity implementations are updated to allow re-opening.

This patch also renames JsonLineReaderBenchmark to JsonInputFormatBenchmark,
updates it to benchmark all three JSON readers, and adds a case that reads
fields out of the parsed row (not just creates it).

* Fixes for static analysis.

* Implement intermediateRowAsString in JsonReader.

* Enhanced JsonInputFormatBenchmark.

Renames JsonLineReaderBenchmark to JsonInputFormatBenchmark, and enhances it to
test various readers (JsonReader, JsonLineReader, JsonNodeReader) as well as
to test with/without field discovery.
This commit is contained in:
Gian Merlino 2024-03-26 08:16:05 -07:00 committed by GitHub
parent f29c8ac368
commit 58a8a23243
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 393 additions and 233 deletions

View File

@ -65,6 +65,11 @@
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>

View File

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.JsonLineReader;
import org.apache.druid.data.input.impl.JsonNodeReader;
import org.apache.druid.data.input.impl.JsonReader;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.transform.TransformSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* Tests {@link JsonInputFormat} delegates, one per {@link ReaderType}.
*
* Output is in nanoseconds per parse (or parse and read) of {@link #DATA_STRING}.
*/
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@OperationsPerInvocation(JsonInputFormatBenchmark.NUM_EVENTS)
@Warmup(iterations = 5)
@Measurement(iterations = 10)
@Fork(value = 1)
public class JsonInputFormatBenchmark
{
enum ReaderType
{
READER(JsonReader.class) {
@Override
public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
{
return new JsonInputFormat(flattenSpec, null, null, false, false).withLineSplittable(false);
}
},
LINE_READER(JsonLineReader.class) {
@Override
public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
{
return new JsonInputFormat(flattenSpec, null, null, null, null).withLineSplittable(true);
}
},
NODE_READER(JsonNodeReader.class) {
@Override
public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
{
return new JsonInputFormat(flattenSpec, null, null, false, true).withLineSplittable(false);
}
};
private final Class<? extends InputEntityReader> clazz;
ReaderType(Class<? extends InputEntityReader> clazz)
{
this.clazz = clazz;
}
public abstract JsonInputFormat createFormat(JSONPathSpec flattenSpec);
}
public static final int NUM_EVENTS = 1000;
private static final String DATA_STRING =
"{" +
"\"stack\":\"mainstack\"," +
"\"metadata\":" +
"{" +
"\"application\":\"applicationname\"," +
"\"detail\":\"tm\"," +
"\"id\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\"," +
"\"idtwo\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\"," +
"\"sequence\":\"v008\"," +
"\"stack\":\"mainstack\"," +
"\"taskId\":\"12345678-1234-1234-1234-1234567890ab\"," +
"\"taskIdTwo\":\"12345678-1234-1234-1234-1234567890ab\"" +
"}," +
"\"_cluster_\":\"kafka\"," +
"\"_id_\":\"12345678-1234-1234-1234-1234567890ab\"," +
"\"_offset_\":12111398526," +
"\"type\":\"CUMULATIVE_DOUBLE\"," +
"\"version\":\"v1\"," +
"\"timestamp\":1670425782281," +
"\"point\":{\"seconds\":1670425782,\"nanos\":217000000,\"value\":0}," +
"\"_kafka_timestamp_\":1670425782304," +
"\"_partition_\":60," +
"\"ec2_instance_id\":\"i-1234567890\"," +
"\"name\":\"packets_received\"," +
"\"_topic_\":\"test_topic\"}";
private static final List<String> FIELDS_TO_READ =
ImmutableList.of(
"stack",
"_cluster_",
"_id_",
"_offset_",
"type",
"version",
"_kafka_timestamp_",
"_partition_",
"ec2_instance_id",
"name",
"_topic",
"root_type",
"path_app",
"jq_app"
);
ReaderType readerType;
InputRowSchema inputRowSchema;
InputEntityReader reader;
JsonInputFormat format;
List<Function<InputRow, Object>> fieldFunctions;
ByteEntity data;
@Param({"reader", "node_reader", "line_reader"})
private String readerTypeString;
/**
* If false: only read {@link #FIELDS_TO_READ}. If true: discover and read all fields.
*/
@Param({"false", "true"})
private boolean discovery;
@Setup
public void setUpTrial() throws Exception
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final byte[] dataUtf8 = StringUtils.toUtf8(DATA_STRING);
for (int i = 0; i < NUM_EVENTS; i++) {
baos.write(dataUtf8);
baos.write(new byte[]{'\n'});
}
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
final DimensionsSpec dimensionsSpec;
if (discovery) {
// Discovered schema, excluding uninteresting fields that are not in FIELDS_TO_READ.
final Set<String> exclusions = Sets.difference(
TestHelper.makeJsonMapper()
.readValue(DATA_STRING, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
.keySet(),
ImmutableSet.copyOf(FIELDS_TO_READ)
);
dimensionsSpec = DimensionsSpec.builder()
.useSchemaDiscovery(true)
.setDimensionExclusions(ImmutableList.copyOf(exclusions))
.build();
} else {
// Fully defined schema.
dimensionsSpec = DimensionsSpec.builder()
.setDimensions(DimensionsSpec.getDefaultSchemas(FIELDS_TO_READ))
.build();
}
data = new ByteEntity(baos.toByteArray());
readerType = ReaderType.valueOf(StringUtils.toUpperCase(readerTypeString));
inputRowSchema = new InputRowSchema(
timestampSpec,
dimensionsSpec,
InputRowSchemas.createColumnsFilter(
timestampSpec,
dimensionsSpec,
TransformSpec.NONE,
new AggregatorFactory[0]
)
);
format = readerType.createFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_type", "type"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_app", "$.metadata.application"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_app", ".metadata.application")
)
)
);
final RowAdapter<InputRow> rowAdapter = format.createRowAdapter(inputRowSchema);
fieldFunctions = new ArrayList<>(FIELDS_TO_READ.size());
for (final String field : FIELDS_TO_READ) {
fieldFunctions.add(rowAdapter.columnFunction(field));
}
reader = format.createReader(inputRowSchema, data, null);
if (reader.getClass() != readerType.clazz) {
throw new ISE(
"Expected class[%s] for readerType[%s], got[%s]",
readerType.clazz,
readerTypeString,
reader.getClass()
);
}
}
/**
* Benchmark parsing, but not reading fields.
*/
@Benchmark
public void parse(final Blackhole blackhole) throws IOException
{
data.getBuffer().rewind();
int counted = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
if (row != null) {
counted += 1;
blackhole.consume(row);
}
}
}
if (counted != NUM_EVENTS) {
throw new RuntimeException("invalid number of loops, counted = " + counted);
}
}
/**
* Benchmark parsing and reading {@link #FIELDS_TO_READ}. More realistic than {@link #parse(Blackhole)}.
*/
@Benchmark
public void parseAndRead(final Blackhole blackhole) throws IOException
{
data.getBuffer().rewind();
int counted = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
if (row != null) {
for (Function<InputRow, Object> fieldFunction : fieldFunctions) {
blackhole.consume(fieldFunction.apply(row));
}
counted += 1;
}
}
}
if (counted != NUM_EVENTS) {
throw new RuntimeException("invalid number of loops, counted = " + counted);
}
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(JsonInputFormatBenchmark.class.getSimpleName())
.build();
new Runner(opt).run();
}
}

View File

@ -1,174 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@Fork(value = 1)
public class JsonLineReaderBenchmark
{
private static final int NUM_EVENTS = 1000;
InputEntityReader reader;
JsonInputFormat format;
byte[] data;
@Setup(Level.Invocation)
public void prepareReader()
{
ByteEntity source = new ByteEntity(data);
reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
}
@Setup
public void prepareData() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
String dataString = "{" +
"\"stack\":\"mainstack\"," +
"\"metadata\":" +
"{" +
"\"application\":\"applicationname\"," +
"\"detail\":\"tm\"," +
"\"id\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\"," +
"\"idtwo\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\"," +
"\"sequence\":\"v008\"," +
"\"stack\":\"mainstack\"," +
"\"taskId\":\"12345678-1234-1234-1234-1234567890ab\"," +
"\"taskIdTwo\":\"12345678-1234-1234-1234-1234567890ab\"" +
"}," +
"\"_cluster_\":\"kafka\"," +
"\"_id_\":\"12345678-1234-1234-1234-1234567890ab\"," +
"\"_offset_\":12111398526," +
"\"type\":\"CUMULATIVE_DOUBLE\"," +
"\"version\":\"v1\"," +
"\"timestamp\":1670425782281," +
"\"point\":{\"seconds\":1670425782,\"nanos\":217000000,\"value\":0}," +
"\"_kafka_timestamp_\":1670425782304," +
"\"_partition_\":60," +
"\"ec2_instance_id\":\"i-1234567890\"," +
"\"name\":\"packets_received\"," +
"\"_topic_\":\"test_topic\"}";
for (int i = 0; i < NUM_EVENTS; i++) {
baos.write(StringUtils.toUtf8(dataString));
baos.write(new byte[]{'\n'});
}
data = baos.toByteArray();
}
@Setup
public void prepareFormat()
{
format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
null,
null
);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void baseline(final Blackhole blackhole) throws IOException
{
int counted = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
if (row != null) {
counted += 1;
}
blackhole.consume(row);
}
}
if (counted != NUM_EVENTS) {
throw new RuntimeException("invalid number of loops, counted = " + counted);
}
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(JsonLineReaderBenchmark.class.getSimpleName())
.build();
new Runner(opt).run();
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.input;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -69,8 +68,7 @@ public class InputRowSchemas
*
* @see InputRowSchema#getColumnsFilter()
*/
@VisibleForTesting
static ColumnsFilter createColumnsFilter(
public static ColumnsFilter createColumnsFilter(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final TransformSpec transformSpec,

View File

@ -236,7 +236,8 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
@Override
public InputRowListPlusRawValues next()
{
final ByteBuffer bb = ((ByteEntity) entityIterator.next()).getBuffer();
// We need to modify the position of the buffer, so duplicate it.
final ByteBuffer bb = ((ByteEntity) entityIterator.next()).getBuffer().duplicate();
final Map<String, Object> rawColumns;
try {

View File

@ -37,27 +37,17 @@ import java.nio.ByteBuffer;
* processing where binary records are arriving as a list but {@link org.apache.druid.data.input.InputEntityReader}, that
* parses the data, expects an {@link InputStream}. This class mimics a continuous InputStream while behind the scenes,
* binary records are being put one after the other that the InputStream consumes bytes from. One record is fully
* consumed and only then the next record is set. This class doesn't allow reading the same data twice.
* consumed and only then the next record is set.
* This class solely exists to overcome the limitations imposed by interfaces for reading and parsing data.
*
*/
@NotThreadSafe
public class SettableByteEntity<T extends ByteEntity> implements InputEntity
{
private final SettableByteBufferInputStream inputStream;
private boolean opened = false;
private T entity;
public SettableByteEntity()
{
this.inputStream = new SettableByteBufferInputStream();
}
public void setEntity(T entity)
{
inputStream.setBuffer(entity.getBuffer());
this.entity = entity;
opened = false;
}
@Nullable
@ -72,19 +62,13 @@ public class SettableByteEntity<T extends ByteEntity> implements InputEntity
return entity;
}
/**
* This method can be called multiple times only for different data. So you can open a new input stream
* only after a new buffer is in use.
*/
@Override
public InputStream open()
{
if (opened) {
throw new IllegalArgumentException("Can't open the input stream on SettableByteEntity more than once");
}
opened = true;
return inputStream;
// Duplicate the entity buffer, because the stream will update its position.
final SettableByteBufferInputStream stream = new SettableByteBufferInputStream();
stream.setBuffer(entity.getBuffer().duplicate());
return stream;
}
public static final class SettableByteBufferInputStream extends InputStream

View File

@ -133,7 +133,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
{
final FluentIterable<InputRow> iterable = FluentIterable
.from(valueBytes)
.transform(ByteEntity::getBuffer)
.transform(entity -> entity.getBuffer().duplicate() /* Parsing may need to modify buffer position */)
.transform(this::incrementProcessedBytes)
.transformAndConcat(parser::parseBatch);

View File

@ -65,9 +65,7 @@ public interface InputEntity
/**
* Opens an {@link InputStream} on the input entity directly.
* This is the basic way to read the given entity.
*
* The behavior of this method is only defined fort the first call to open().
* The behavior of subsequent calls is undefined and may vary between implementations.
* This method may be called multiple times to re-read the data from the entity.
*
* @see #fetch
*/

View File

@ -31,9 +31,13 @@ public class ByteEntity implements InputEntity
{
private final ByteBuffer buffer;
/**
* Create a new entity. The buffer is not duplicated, so it is important to ensure that its position and limit
* are not modified after this entity is created.
*/
public ByteEntity(ByteBuffer buffer)
{
this.buffer = buffer.duplicate();
this.buffer = buffer;
}
public ByteEntity(byte[] bytes)
@ -41,6 +45,11 @@ public class ByteEntity implements InputEntity
this(ByteBuffer.wrap(bytes));
}
/**
* Return the buffer backing this entity. Calling code must not modify the mark, position, or limit of this buffer.
* If you need to modify them, call {@link ByteBuffer#duplicate()} or {@link ByteBuffer#asReadOnlyBuffer()} and
* modify the copy.
*/
public ByteBuffer getBuffer()
{
return buffer;
@ -56,6 +65,6 @@ public class ByteEntity implements InputEntity
@Override
public InputStream open()
{
return new ByteBufferInputStream(buffer);
return new ByteBufferInputStream(buffer.duplicate());
}
}

View File

@ -33,7 +33,6 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -43,6 +42,7 @@ import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -94,10 +94,8 @@ public class JsonNodeReader extends IntermediateRowParsingReader<JsonNode>
@Override
protected CloseableIterator<JsonNode> intermediateRowIterator() throws IOException
{
final String sourceString = IOUtils.toString(source.open(), StringUtils.UTF8_STRING);
final List<JsonNode> jsonNodes = new ArrayList<>();
try {
JsonParser parser = jsonFactory.createParser(sourceString);
try (final JsonParser parser = jsonFactory.createParser(source.open())) {
final MappingIterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
while (delegate.hasNext()) {
jsonNodes.add(delegate.next());
@ -107,9 +105,10 @@ public class JsonNodeReader extends IntermediateRowParsingReader<JsonNode>
//convert Jackson's JsonParseException into druid's exception for further processing
//JsonParseException will be thrown from MappingIterator#hasNext or MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
final String rowAsString = IOUtils.toString(source.open(), StandardCharsets.UTF_8);
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
new ParseException(sourceString, e, "Unable to parse row [%s]", sourceString)
new ParseException(rowAsString, e, "Unable to parse row [%s]", rowAsString)
)
);
} else {
@ -117,13 +116,14 @@ public class JsonNodeReader extends IntermediateRowParsingReader<JsonNode>
}
}
if (CollectionUtils.isNullOrEmpty(jsonNodes)) {
if (jsonNodes.isEmpty()) {
final String rowAsString = IOUtils.toString(source.open(), StandardCharsets.UTF_8);
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
new ParseException(
sourceString,
rowAsString,
"Unable to parse [%s] as the intermediateRow resulted in empty input row",
sourceString
rowAsString
)
)
);

View File

@ -33,16 +33,17 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -59,7 +60,7 @@ import java.util.Map;
*
* For more information, see: https://github.com/apache/druid/pull/10383
*/
public class JsonReader extends IntermediateRowParsingReader<String>
public class JsonReader extends IntermediateRowParsingReader<InputEntity>
{
private final ObjectFlattener<JsonNode> flattener;
private final ObjectMapper mapper;
@ -89,11 +90,9 @@ public class JsonReader extends IntermediateRowParsingReader<String>
}
@Override
protected CloseableIterator<String> intermediateRowIterator() throws IOException
protected CloseableIterator<InputEntity> intermediateRowIterator()
{
return CloseableIterators.withEmptyBaggage(
Iterators.singletonIterator(IOUtils.toString(source.open(), StringUtils.UTF8_STRING))
);
return CloseableIterators.withEmptyBaggage(Iterators.singletonIterator(source));
}
@Override
@ -103,39 +102,59 @@ public class JsonReader extends IntermediateRowParsingReader<String>
}
@Override
protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
protected String intermediateRowAsString(@Nullable InputEntity entity)
{
final List<InputRow> inputRows;
try (JsonParser parser = jsonFactory.createParser(intermediateRow)) {
if (entity == null) {
return "null";
} else {
try {
return IOUtils.toString(entity.open(), StandardCharsets.UTF_8);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
protected List<InputRow> parseInputRows(InputEntity entity) throws IOException, ParseException
{
final List<InputRow> inputRows = new ArrayList<>();
try (JsonParser parser = jsonFactory.createParser(entity.open())) {
final MappingIterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
inputRows = FluentIterable.from(() -> delegate)
.transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
.toList();
while (delegate.hasNext()) {
final JsonNode row = delegate.next();
inputRows.add(MapInputRowParser.parse(inputRowSchema, flattener.flatten(row)));
}
}
catch (RuntimeException e) {
//convert Jackson's JsonParseException into druid's exception for further processing
//JsonParseException will be thrown from MappingIterator#hasNext or MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
throw new ParseException(intermediateRow, e, "Unable to parse row [%s]", intermediateRow);
final String rowAsString = IOUtils.toString(entity.open(), StandardCharsets.UTF_8);
throw new ParseException(rowAsString, e, "Unable to parse row [%s]", rowAsString);
}
//throw unknown exception
throw e;
}
if (CollectionUtils.isNullOrEmpty(inputRows)) {
if (inputRows.isEmpty()) {
final String rowAsString = IOUtils.toString(entity.open(), StandardCharsets.UTF_8);
throw new ParseException(
intermediateRow,
rowAsString,
"Unable to parse [%s] as the intermediateRow resulted in empty input row",
intermediateRow
rowAsString
);
}
return inputRows;
}
@Override
protected List<Map<String, Object>> toMap(String intermediateRow) throws IOException
protected List<Map<String, Object>> toMap(InputEntity entity) throws IOException
{
try (JsonParser parser = jsonFactory.createParser(intermediateRow)) {
try (JsonParser parser = jsonFactory.createParser(entity.open())) {
final MappingIterator<Map> delegate = mapper.readValues(parser, Map.class);
return FluentIterable.from(() -> delegate)
.transform(map -> (Map<String, Object>) map)
@ -145,7 +164,8 @@ public class JsonReader extends IntermediateRowParsingReader<String>
//convert Jackson's JsonParseException into druid's exception for further processing
//JsonParseException will be thrown from MappingIterator#hasNext or MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
throw new ParseException(intermediateRow, e, "Unable to parse row [%s]", intermediateRow);
final String rowAsString = IOUtils.toString(entity.open(), StandardCharsets.UTF_8);
throw new ParseException(rowAsString, e, "Unable to parse row [%s]", rowAsString);
}
//throw unknown exception