Add JsonInputFormat option to assume newline delimited JSON, improve parse exception handling for multiline JSON (#13089)

* Add JsonInputFormat option to assume newline delimited JSON, improve handling for non-NDJSON

* Fix serde and docs

* Add PR comment check
This commit is contained in:
Jonathan Wei 2022-09-26 19:51:04 -05:00 committed by GitHub
parent e839660b6a
commit 1f1fced6d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 844 additions and 71 deletions

View File

@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import javax.annotation.Nullable;
@ -55,21 +56,42 @@ public class JsonInputFormat extends NestedInputFormat
*/
private final boolean lineSplittable;
/**
* If true, overrides the behavior controlled by lineSplittable and the parsing will assume that the input
* is newline delimited. This is to allow streaming ingestion to use the newline delimited parser when the
* input is known to follow that format, since this allows for more flexible handling of parse errors (i.e.,
* an invalid JSON event on one line will not prevent other events on different lines from being ingested).
*/
private final boolean assumeNewlineDelimited;
/**
* If true, use the JsonNodeReader when parsing non-newline delimited JSON. This parser splits the input string
* into JsonNode objects, parsing each JsonNode into a separate InputRow, instead of parsing the input string
* into several InputRow at the same time. This allows for more flexible handling of parse errors, where timestamp
* parsing errors do not prevent other events in the same string from being ingested, and allows valid events prior to
* encountering invalid JSON syntax to be ingested as well.
*/
private final boolean useJsonNodeReader;
@JsonCreator
public JsonInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("featureSpec") @Nullable Map<String, Boolean> featureSpec,
@JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns
@JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns,
@JsonProperty("assumeNewlineDelimited") @Nullable Boolean assumeNewlineDelimited,
@JsonProperty("useJsonNodeReader") @Nullable Boolean useJsonNodeReader
)
{
this(flattenSpec, featureSpec, keepNullColumns, true);
this(flattenSpec, featureSpec, keepNullColumns, true, assumeNewlineDelimited, useJsonNodeReader);
}
public JsonInputFormat(
@Nullable JSONPathSpec flattenSpec,
Map<String, Boolean> featureSpec,
Boolean keepNullColumns,
boolean lineSplittable
boolean lineSplittable,
Boolean assumeNewlineDelimited,
Boolean useJsonNodeReader
)
{
super(flattenSpec);
@ -85,6 +107,11 @@ public class JsonInputFormat extends NestedInputFormat
objectMapper.configure(feature, entry.getValue());
}
this.lineSplittable = lineSplittable;
this.assumeNewlineDelimited = assumeNewlineDelimited != null && assumeNewlineDelimited;
this.useJsonNodeReader = useJsonNodeReader != null && useJsonNodeReader;
if (this.assumeNewlineDelimited && this.useJsonNodeReader) {
throw new IAE("useJsonNodeReader cannot be set to true when assumeNewlineDelimited is true.");
}
}
@JsonProperty
@ -100,6 +127,18 @@ public class JsonInputFormat extends NestedInputFormat
return keepNullColumns;
}
@JsonProperty
public boolean isAssumeNewlineDelimited()
{
return assumeNewlineDelimited;
}
@JsonProperty
public boolean isUseJsonNodeReader()
{
return useJsonNodeReader;
}
@Override
public boolean isSplittable()
{
@ -109,9 +148,13 @@ public class JsonInputFormat extends NestedInputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return this.lineSplittable ?
new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns) :
new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
if (this.lineSplittable || this.assumeNewlineDelimited) {
return new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
} else if (this.useJsonNodeReader) {
return new JsonNodeReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
} else {
return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
}
}
/**
@ -124,7 +167,10 @@ public class JsonInputFormat extends NestedInputFormat
return new JsonInputFormat(this.getFlattenSpec(),
this.getFeatureSpec(),
this.keepNullColumns,
lineSplittable);
lineSplittable,
assumeNewlineDelimited,
useJsonNodeReader
);
}
@Override
@ -140,12 +186,23 @@ public class JsonInputFormat extends NestedInputFormat
return false;
}
JsonInputFormat that = (JsonInputFormat) o;
return this.lineSplittable == that.lineSplittable && Objects.equals(featureSpec, that.featureSpec) && Objects.equals(keepNullColumns, that.keepNullColumns);
return keepNullColumns == that.keepNullColumns
&& lineSplittable == that.lineSplittable
&& assumeNewlineDelimited == that.assumeNewlineDelimited
&& useJsonNodeReader == that.useJsonNodeReader
&& Objects.equals(featureSpec, that.featureSpec);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), featureSpec, keepNullColumns, lineSplittable);
return Objects.hash(
super.hashCode(),
featureSpec,
keepNullColumns,
lineSplittable,
assumeNewlineDelimited,
useJsonNodeReader
);
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* In contrast to {@link JsonLineReader} which processes input text line by line independently,
* this class tries to split the input into a list of JsonNode objects, and then parses each JsonNode independently
* into an InputRow.
*
* <p>
* The input text can be:
* 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
* 2. multiple JSON object strings concated by white space character(s)
* <p>
* If an input string contains invalid JSON syntax, any valid JSON objects found prior to encountering the invalid
* syntax will be successfully parsed, but parsing will not continue after the invalid syntax.
* <p>
*/
public class JsonNodeReader extends IntermediateRowParsingReader<JsonNode>
{
private final ObjectFlattener<JsonNode> flattener;
private final ObjectMapper mapper;
private final InputEntity source;
private final InputRowSchema inputRowSchema;
private final JsonFactory jsonFactory;
JsonNodeReader(
InputRowSchema inputRowSchema,
InputEntity source,
JSONPathSpec flattenSpec,
ObjectMapper mapper,
boolean keepNullColumns
)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}
@Override
protected CloseableIterator<JsonNode> intermediateRowIterator() throws IOException
{
final String sourceString = IOUtils.toString(source.open(), StringUtils.UTF8_STRING);
final List<JsonNode> jsonNodes = new ArrayList<>();
try {
JsonParser parser = jsonFactory.createParser(sourceString);
final MappingIterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
while (delegate.hasNext()) {
jsonNodes.add(delegate.next());
}
}
catch (Exception e) {
//convert Jackson's JsonParseException into druid's exception for further processing
//JsonParseException will be thrown from MappingIterator#hasNext or MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
new ParseException(sourceString, e, "Unable to parse row [%s]", sourceString)
)
);
} else {
throw e;
}
}
if (CollectionUtils.isNullOrEmpty(jsonNodes)) {
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
new ParseException(
sourceString,
"Unable to parse [%s] as the intermediateRow resulted in empty input row",
sourceString
)
)
);
}
return CloseableIterators.withEmptyBaggage(jsonNodes.iterator());
}
@Override
protected InputEntity source()
{
return source;
}
@Override
protected List<InputRow> parseInputRows(JsonNode intermediateRow) throws ParseException
{
if (intermediateRow instanceof ParseExceptionMarkerJsonNode) {
throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException();
}
final List<InputRow> inputRows = Collections.singletonList(
MapInputRowParser.parse(inputRowSchema, flattener.flatten(intermediateRow))
);
if (CollectionUtils.isNullOrEmpty(inputRows)) {
throw new ParseException(
intermediateRow.toString(),
"Unable to parse [%s] as the intermediateRow resulted in empty input row",
intermediateRow.toString()
);
}
return inputRows;
}
@Override
protected List<Map<String, Object>> toMap(JsonNode intermediateRow) throws IOException
{
if (intermediateRow instanceof ParseExceptionMarkerJsonNode) {
throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException();
}
return Collections.singletonList(
mapper.readValue(intermediateRow.toString(), new TypeReference<Map<String, Object>>()
{
})
);
}
private static class ParseExceptionMarkerJsonNode extends ObjectNode
{
final ParseException parseException;
public ParseExceptionMarkerJsonNode(ParseException pe)
{
super(null);
this.parseException = pe;
}
public ParseException getParseException()
{
return parseException;
}
}
}

View File

@ -131,7 +131,7 @@ public class CloudObjectInputSourceTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
@ -152,7 +152,7 @@ public class CloudObjectInputSourceTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
@ -173,7 +173,7 @@ public class CloudObjectInputSourceTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
@ -194,7 +194,7 @@ public class CloudObjectInputSourceTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);

View File

@ -52,7 +52,9 @@ public class JsonInputFormatTest
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
true
true,
false,
false
);
final byte[] bytes = mapper.writeValueAsBytes(format);
final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, InputFormat.class);
@ -79,6 +81,8 @@ public class JsonInputFormatTest
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(false, null),
null,
null,
null,
null
);
Assert.assertFalse(format.isKeepNullColumns());
@ -90,6 +94,8 @@ public class JsonInputFormatTest
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null,
null,
null
);
Assert.assertTrue(format.isKeepNullColumns());
@ -101,7 +107,9 @@ public class JsonInputFormatTest
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(true, null),
null,
false
false,
null,
null
);
Assert.assertFalse(format.isKeepNullColumns());
}

View File

@ -56,6 +56,8 @@ public class JsonLineReaderTest
)
),
null,
null,
null,
null
);
@ -106,6 +108,8 @@ public class JsonLineReaderTest
)
),
null,
null,
null,
null
);
@ -148,7 +152,9 @@ public class JsonLineReaderTest
)
),
null,
true
true,
null,
null
);
final ByteEntity source = new ByteEntity(
@ -190,7 +196,9 @@ public class JsonLineReaderTest
)
),
null,
true
true,
null,
null
);
final ByteEntity source = new ByteEntity(
@ -232,7 +240,9 @@ public class JsonLineReaderTest
)
),
null,
false
false,
null,
null
);
final ByteEntity source = new ByteEntity(

View File

@ -0,0 +1,475 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
public class JsonNodeReaderTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testParseMultipleRows() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false, //make sure JsonReader is used,
false,
true
);
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":2}}\n"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
final int numExpectedIterations = 3;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
final String msgId = String.valueOf(++numActualIterations);
Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testParsePrettyFormatJSON() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false, //make sure JsonReader is used
false,
true
);
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\n"
+ " \"timestamp\": \"2019-01-01\",\n"
+ " \"bar\": null,\n"
+ " \"foo\": \"x\",\n"
+ " \"baz\": 4,\n"
+ " \"o\": {\n"
+ " \"mg\": 1\n"
+ " }\n"
+ "}")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testInvalidJSONText() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false, //make sure JsonReader is used
false,
true
);
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}"
//baz property is illegal
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
//expect a ParseException on the following `next` call on iterator
expectedException.expect(ParseException.class);
// the 2nd line is ill-formed, so the parse of this text chunk aborts
final int numExpectedIterations = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
iterator.next();
++numActualIterations;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testSampleMultipleRows() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false, //make sure JsonReader is used
false,
true
);
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":2}}\n"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
int acturalRowCount = 0;
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
while (iterator.hasNext()) {
final InputRowListPlusRawValues rawValues = iterator.next();
// 1 row returned 3 times
Assert.assertEquals(1, rawValues.getInputRows().size());
InputRow row = rawValues.getInputRows().get(0);
final String msgId = String.valueOf(++acturalRowCount);
Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
}
}
Assert.assertEquals(3, acturalRowCount);
}
@Test
public void testSamplInvalidJSONText() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false, //make sure JsonReader is used
false,
true
);
//2nd row is has an invalid timestamp which causes a parse exception, but is valid JSON
//3rd row is malformed json and terminates the row iteration
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+ "{\"timestamp\":\"invalidtimestamp\",\"bar\":null,\"foo\":\"x\",\"baz\":5,\"o\":{\"mg\":2}}\n"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n"
//value of baz is invalid
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
// the invalid timestamp in line 2 causes a parse exception, but because it is valid JSON, parsing can continue
// the invalid character in line 3 stops parsing of the 4-line text as a whole
// so the total num of iteration is 3
final int numExpectedIterations = 3;
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
numActualIterations++;
final InputRowListPlusRawValues rawValues = iterator.next();
if (numActualIterations == 2 || numActualIterations == 3) {
Assert.assertNotNull(rawValues.getParseException());
}
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testEmptyJSONText() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false, //make sure JsonReader is used
false,
true
);
//input is empty
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8(
"" // empty row
)
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
//expect a ParseException on the following `next` call on iterator
expectedException.expect(ParseException.class);
// the 2nd line is ill-formed, so the parse of this text chunk aborts
final int numExpectedIterations = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
iterator.next();
++numActualIterations;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testSampleEmptyText() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false, //make sure JsonReader is used
false,
true
);
//input is empty
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
ColumnsFilter.all()
),
source,
null
);
// the total num of iteration is 1
final int numExpectedIterations = 1;
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
numActualIterations++;
final InputRowListPlusRawValues rawValues = iterator.next();
Assert.assertNotNull(rawValues.getParseException());
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
}

View File

@ -62,7 +62,9 @@ public class JsonReaderTest
),
null,
null,
false //make sure JsonReader is used
false, //make sure JsonReader is used
false,
false
);
final ByteEntity source = new ByteEntity(
@ -122,7 +124,9 @@ public class JsonReaderTest
),
null,
null,
false //make sure JsonReader is used
false, //make sure JsonReader is used
false,
false
);
final ByteEntity source = new ByteEntity(
@ -189,7 +193,9 @@ public class JsonReaderTest
),
null,
null,
false //make sure JsonReader is used
false, //make sure JsonReader is used
false,
false
);
final ByteEntity source = new ByteEntity(
@ -243,7 +249,9 @@ public class JsonReaderTest
),
null,
null,
false //make sure JsonReader is used
false, //make sure JsonReader is used
false,
false
);
final ByteEntity source = new ByteEntity(
@ -309,7 +317,9 @@ public class JsonReaderTest
),
null,
null,
false //make sure JsonReader is used
false, //make sure JsonReader is used
false,
false
);
//2nd row is ill-formed
@ -365,7 +375,9 @@ public class JsonReaderTest
),
null,
null,
false //make sure JsonReader is used
false, //make sure JsonReader is used
false,
false
);
//input is empty
@ -421,7 +433,9 @@ public class JsonReaderTest
),
null,
null,
false //make sure JsonReader is used
false, //make sure JsonReader is used
false,
false
);
//input is empty

View File

@ -93,6 +93,13 @@ Configure the JSON `inputFormat` to load JSON data as follows:
| flattenSpec | JSON Object | Specifies flattening configuration for nested JSON data. See [`flattenSpec`](#flattenspec) for more info. | no |
| featureSpec | JSON Object | [JSON parser features](https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features) supported by Jackson, a JSON processor for Java. The features control parsing of the input JSON data. To enable a feature, map the feature name to a Boolean value of "true". For example: `"featureSpec": {"ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true}` | no |
The following properties are specialized properties that only apply when the JSON `inputFormat` is used in streaming ingestion, and they are related to how parsing exceptions are handled. In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON event spans multiple lines). However, if a parsing exception occurs, all JSON events that are present in the same streaming record will be discarded.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| assumeNewlineDelimited | Boolean | If the input is known to be newline delimited JSON (each individual JSON event is contained in a single line, separated by newlines), setting this option to true allows for more flexible parsing exception handling. Only the lines with invalid JSON syntax will be discarded, while lines containing valid JSON events will still be ingested. | no (Default false) |
| useJsonNodeReader | Boolean | When ingesting multi-line JSON events, enabling this option will enable the use of a JSON parser which will retain any valid JSON events encountered within a streaming record prior to when a parsing exception occurred. | no (Default false) |
For example:
```json
"ioConfig": {

View File

@ -327,7 +327,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
@ -353,7 +353,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
@ -380,7 +380,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null)
);
@ -410,7 +410,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
Assert.assertEquals(
@ -444,7 +444,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
);
inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
).collect(Collectors.toList());
}

View File

@ -159,7 +159,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
@ -178,7 +178,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
@ -229,7 +229,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
@ -251,7 +251,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null)
);

View File

@ -91,7 +91,8 @@ public class KafkaInputFormatTest
// Key Format
new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
null, null, false //make sure JsonReader is used
null, null, false, //make sure JsonReader is used
false, false
),
// Value Format
new JsonInputFormat(
@ -106,7 +107,8 @@ public class KafkaInputFormatTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null, null, false //make sure JsonReader is used
null, null, false, //make sure JsonReader is used
false, false
),
"kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp"
);
@ -121,7 +123,8 @@ public class KafkaInputFormatTest
// Key Format
new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
null, null, false //make sure JsonReader is used
null, null, false, //make sure JsonReader is used
false, false
),
// Value Format
new JsonInputFormat(
@ -136,7 +139,8 @@ public class KafkaInputFormatTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null, null, false //make sure JsonReader is used
null, null, false, //make sure JsonReader is used
false, false
),
"kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp"
);
@ -407,7 +411,8 @@ public class KafkaInputFormatTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null, null, false //make sure JsonReader is used
null, null, false, //make sure JsonReader is used
false, false
),
"kafka.newheader.", "kafka.newkey.", "kafka.newts."
);

View File

@ -130,7 +130,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
new KafkaSupervisorIOConfig(
TOPIC,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,
@ -303,7 +303,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
new KafkaSupervisorIOConfig(
TOPIC,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,

View File

@ -133,6 +133,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(),
false,
false,
false
);
private static final String TOPIC_PREFIX = "testTopic";

View File

@ -135,7 +135,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
new KinesisSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
null,
null,
null,

View File

@ -122,6 +122,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(),
false,
false,
false,
false
);
private static final String DATASOURCE = "testDS";

View File

@ -229,7 +229,7 @@ public class DataSourcePlan
return forExternal(
new ExternalDataSource(
dataString.isEmpty() ? NilInputSource.instance() : new InlineInputSource(dataString),
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
signature
),
broadcast

View File

@ -625,7 +625,7 @@ public class MSQSelectTest extends MSQTestBase
.setDataSource(
new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(toRead.getAbsoluteFile())),
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("page", ColumnType.STRING)

View File

@ -82,7 +82,7 @@ public class MSQWarningsTest extends MSQTestBase
toRead.getAbsoluteFile()
)
),
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("page", ColumnType.STRING)

View File

@ -49,7 +49,7 @@ import java.util.stream.Stream;
public class ExternalInputSpecSlicerTest
{
static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null);
static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null, null, null);
static final RowSignature SIGNATURE = RowSignature.builder().add("s", ColumnType.STRING).build();
private ExternalInputSpecSlicer slicer;

View File

@ -663,7 +663,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
@ -688,7 +688,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
@ -713,7 +713,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
@ -738,7 +738,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
@ -768,7 +768,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
@ -799,7 +799,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
@ -830,7 +830,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null)
);
@ -864,7 +864,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
Assert.assertEquals(
@ -902,7 +902,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
).collect(Collectors.toList());
}

View File

@ -1604,7 +1604,7 @@ public class IndexTaskTest extends IngestionTestBase
tmpDir,
timestampSpec,
dimensionsSpec,
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
null,
null,
tuningConfig,

View File

@ -80,7 +80,7 @@ public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiP
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
);
private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null);
private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null, null, null);
private static final List<Interval> INTERVAL_TO_INDEX = Collections.singletonList(Intervals.of("2022-01/P1M"));
@Parameterized.Parameters
@ -194,6 +194,8 @@ public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiP
new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null,
null,
null
),
false,
@ -259,6 +261,8 @@ public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiP
)
),
null,
null,
null,
null
),
false,

View File

@ -227,7 +227,7 @@ public class ParallelIndexSupervisorTaskTest
final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
new InlineInputSource("test"),
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
appendToExisting,
null
);

View File

@ -289,6 +289,6 @@ class ParallelIndexTestingFactory
static InputFormat getInputFormat()
{
return new JsonInputFormat(null, null, null);
return new JsonInputFormat(null, null, null, null, null);
}
}

View File

@ -47,7 +47,7 @@ public class PartialHashSegmentGenerateTaskTest
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
private static final ParallelIndexIngestionSpec INGESTION_SPEC = ParallelIndexTestingFactory.createIngestionSpec(
new LocalInputSource(new File("baseDir"), "filer"),
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
new ParallelIndexTestingFactory.TuningConfigBuilder().build(),
ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS)
);
@ -161,7 +161,7 @@ public class PartialHashSegmentGenerateTaskTest
ParallelIndexTestingFactory.NUM_ATTEMPTS,
ParallelIndexTestingFactory.createIngestionSpec(
new LocalInputSource(new File("baseDir"), "filer"),
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
new ParallelIndexTestingFactory.TuningConfigBuilder().build(),
ParallelIndexTestingFactory.createDataSchema(null)
),

View File

@ -851,6 +851,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null,
null,
null
),
false,

View File

@ -54,7 +54,7 @@ public class SinglePhaseSubTaskSpecTest
new ParallelIndexIOConfig(
null,
new LocalInputSource(new File("baseDir"), "filter"),
new JsonInputFormat(null, null, null),
new JsonInputFormat(null, null, null, null, null),
null,
null
),

View File

@ -1463,7 +1463,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
{
switch (parserType) {
case STR_JSON:
return new JsonInputFormat(null, null, null);
return new JsonInputFormat(null, null, null, null, null);
case STR_CSV:
return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, null, false, 0);
default:

View File

@ -128,6 +128,8 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
protected static final InputFormat INPUT_FORMAT = new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(),
null,
null,
null
);
protected static final Logger LOG = new Logger(SeekableStreamIndexTaskTestBase.class);

View File

@ -856,7 +856,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
{
SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
@ -927,7 +927,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
if (scaleOut) {
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
taskCount,
new Period("PT1H"),
@ -944,7 +944,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
} else {
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
taskCount,
new Period("PT1H"),

View File

@ -106,7 +106,7 @@ public class StreamChunkParserTest
@Test
public void testWithNullParserAndInputformatParseProperly() throws IOException
{
final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null);
final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null, null, null);
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
null,
inputFormat,
@ -237,7 +237,7 @@ public class StreamChunkParserTest
private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec,
@Nullable Map<String, Boolean> featureSpec)
{
super(flattenSpec, featureSpec, null);
super(flattenSpec, featureSpec, null, null, null);
props = new Props();
}
@ -246,7 +246,7 @@ public class StreamChunkParserTest
boolean lineSplittable,
Props props)
{
super(flattenSpec, featureSpec, null, lineSplittable);
super(flattenSpec, featureSpec, null, lineSplittable, null, null);
this.props = props;
}

View File

@ -897,7 +897,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
@ -955,7 +955,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),

View File

@ -216,6 +216,7 @@ aggregators
ambari
analytics
arrayElement
assumeNewlineDelimited
assumeRoleArn
assumeRoleExternalId
async
@ -489,6 +490,7 @@ unsetting
untrusted
useFilterCNF
useJqSyntax
useJsonNodeReader
useSSL
uptime
uris