diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 5e18ddf58ac..041942c9e62 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import javax.annotation.Nullable; @@ -55,21 +56,42 @@ public class JsonInputFormat extends NestedInputFormat */ private final boolean lineSplittable; + /** + * If true, overrides the behavior controlled by lineSplittable and the parsing will assume that the input + * is newline delimited. This is to allow streaming ingestion to use the newline delimited parser when the + * input is known to follow that format, since this allows for more flexible handling of parse errors (i.e., + * an invalid JSON event on one line will not prevent other events on different lines from being ingested). + */ + private final boolean assumeNewlineDelimited; + + /** + * If true, use the JsonNodeReader when parsing non-newline delimited JSON. This parser splits the input string + * into JsonNode objects, parsing each JsonNode into a separate InputRow, instead of parsing the input string + * into several InputRow at the same time. This allows for more flexible handling of parse errors, where timestamp + * parsing errors do not prevent other events in the same string from being ingested, and allows valid events prior to + * encountering invalid JSON syntax to be ingested as well. + */ + private final boolean useJsonNodeReader; + @JsonCreator public JsonInputFormat( @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, @JsonProperty("featureSpec") @Nullable Map featureSpec, - @JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns + @JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns, + @JsonProperty("assumeNewlineDelimited") @Nullable Boolean assumeNewlineDelimited, + @JsonProperty("useJsonNodeReader") @Nullable Boolean useJsonNodeReader ) { - this(flattenSpec, featureSpec, keepNullColumns, true); + this(flattenSpec, featureSpec, keepNullColumns, true, assumeNewlineDelimited, useJsonNodeReader); } public JsonInputFormat( @Nullable JSONPathSpec flattenSpec, Map featureSpec, Boolean keepNullColumns, - boolean lineSplittable + boolean lineSplittable, + Boolean assumeNewlineDelimited, + Boolean useJsonNodeReader ) { super(flattenSpec); @@ -85,6 +107,11 @@ public class JsonInputFormat extends NestedInputFormat objectMapper.configure(feature, entry.getValue()); } this.lineSplittable = lineSplittable; + this.assumeNewlineDelimited = assumeNewlineDelimited != null && assumeNewlineDelimited; + this.useJsonNodeReader = useJsonNodeReader != null && useJsonNodeReader; + if (this.assumeNewlineDelimited && this.useJsonNodeReader) { + throw new IAE("useJsonNodeReader cannot be set to true when assumeNewlineDelimited is true."); + } } @JsonProperty @@ -100,6 +127,18 @@ public class JsonInputFormat extends NestedInputFormat return keepNullColumns; } + @JsonProperty + public boolean isAssumeNewlineDelimited() + { + return assumeNewlineDelimited; + } + + @JsonProperty + public boolean isUseJsonNodeReader() + { + return useJsonNodeReader; + } + @Override public boolean isSplittable() { @@ -109,9 +148,13 @@ public class JsonInputFormat extends NestedInputFormat @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return this.lineSplittable ? - new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns) : - new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + if (this.lineSplittable || this.assumeNewlineDelimited) { + return new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + } else if (this.useJsonNodeReader) { + return new JsonNodeReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + } else { + return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + } } /** @@ -124,7 +167,10 @@ public class JsonInputFormat extends NestedInputFormat return new JsonInputFormat(this.getFlattenSpec(), this.getFeatureSpec(), this.keepNullColumns, - lineSplittable); + lineSplittable, + assumeNewlineDelimited, + useJsonNodeReader + ); } @Override @@ -140,12 +186,23 @@ public class JsonInputFormat extends NestedInputFormat return false; } JsonInputFormat that = (JsonInputFormat) o; - return this.lineSplittable == that.lineSplittable && Objects.equals(featureSpec, that.featureSpec) && Objects.equals(keepNullColumns, that.keepNullColumns); + return keepNullColumns == that.keepNullColumns + && lineSplittable == that.lineSplittable + && assumeNewlineDelimited == that.assumeNewlineDelimited + && useJsonNodeReader == that.useJsonNodeReader + && Objects.equals(featureSpec, that.featureSpec); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), featureSpec, keepNullColumns, lineSplittable); + return Objects.hash( + super.hashCode(), + featureSpec, + keepNullColumns, + lineSplittable, + assumeNewlineDelimited, + useJsonNodeReader + ); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java new file mode 100644 index 00000000000..7a7b98c4529 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.CollectionUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * In contrast to {@link JsonLineReader} which processes input text line by line independently, + * this class tries to split the input into a list of JsonNode objects, and then parses each JsonNode independently + * into an InputRow. + * + *

+ * The input text can be: + * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text) + * 2. multiple JSON object strings concated by white space character(s) + *

+ * If an input string contains invalid JSON syntax, any valid JSON objects found prior to encountering the invalid + * syntax will be successfully parsed, but parsing will not continue after the invalid syntax. + *

+ */ +public class JsonNodeReader extends IntermediateRowParsingReader +{ + private final ObjectFlattener flattener; + private final ObjectMapper mapper; + private final InputEntity source; + private final InputRowSchema inputRowSchema; + private final JsonFactory jsonFactory; + + JsonNodeReader( + InputRowSchema inputRowSchema, + InputEntity source, + JSONPathSpec flattenSpec, + ObjectMapper mapper, + boolean keepNullColumns + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns)); + this.mapper = mapper; + this.jsonFactory = new JsonFactory(); + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + final String sourceString = IOUtils.toString(source.open(), StringUtils.UTF8_STRING); + final List jsonNodes = new ArrayList<>(); + try { + JsonParser parser = jsonFactory.createParser(sourceString); + final MappingIterator delegate = mapper.readValues(parser, JsonNode.class); + while (delegate.hasNext()) { + jsonNodes.add(delegate.next()); + } + } + catch (Exception e) { + //convert Jackson's JsonParseException into druid's exception for further processing + //JsonParseException will be thrown from MappingIterator#hasNext or MappingIterator#next when input json text is ill-formed + if (e.getCause() instanceof JsonParseException) { + jsonNodes.add( + new ParseExceptionMarkerJsonNode( + new ParseException(sourceString, e, "Unable to parse row [%s]", sourceString) + ) + ); + } else { + throw e; + } + } + + if (CollectionUtils.isNullOrEmpty(jsonNodes)) { + jsonNodes.add( + new ParseExceptionMarkerJsonNode( + new ParseException( + sourceString, + "Unable to parse [%s] as the intermediateRow resulted in empty input row", + sourceString + ) + ) + ); + } + return CloseableIterators.withEmptyBaggage(jsonNodes.iterator()); + } + + @Override + protected InputEntity source() + { + return source; + } + + @Override + protected List parseInputRows(JsonNode intermediateRow) throws ParseException + { + if (intermediateRow instanceof ParseExceptionMarkerJsonNode) { + throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException(); + } + final List inputRows = Collections.singletonList( + MapInputRowParser.parse(inputRowSchema, flattener.flatten(intermediateRow)) + ); + + if (CollectionUtils.isNullOrEmpty(inputRows)) { + throw new ParseException( + intermediateRow.toString(), + "Unable to parse [%s] as the intermediateRow resulted in empty input row", + intermediateRow.toString() + ); + } + return inputRows; + } + + @Override + protected List> toMap(JsonNode intermediateRow) throws IOException + { + if (intermediateRow instanceof ParseExceptionMarkerJsonNode) { + throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException(); + } + return Collections.singletonList( + mapper.readValue(intermediateRow.toString(), new TypeReference>() + { + }) + ); + } + + private static class ParseExceptionMarkerJsonNode extends ObjectNode + { + final ParseException parseException; + + public ParseExceptionMarkerJsonNode(ParseException pe) + { + super(null); + this.parseException = pe; + } + + public ParseException getParseException() + { + return parseException; + } + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java index 70421c6afaf..7555e6afa59 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java @@ -131,7 +131,7 @@ public class CloudObjectInputSourceTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -152,7 +152,7 @@ public class CloudObjectInputSourceTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -173,7 +173,7 @@ public class CloudObjectInputSourceTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -194,7 +194,7 @@ public class CloudObjectInputSourceTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java index 0577dd72fb4..d7afec9dffb 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java @@ -52,7 +52,9 @@ public class JsonInputFormatTest ) ), ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false), - true + true, + false, + false ); final byte[] bytes = mapper.writeValueAsBytes(format); final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, InputFormat.class); @@ -79,6 +81,8 @@ public class JsonInputFormatTest final JsonInputFormat format = new JsonInputFormat( new JSONPathSpec(false, null), null, + null, + null, null ); Assert.assertFalse(format.isKeepNullColumns()); @@ -90,6 +94,8 @@ public class JsonInputFormatTest final JsonInputFormat format = new JsonInputFormat( new JSONPathSpec(true, null), null, + null, + null, null ); Assert.assertTrue(format.isKeepNullColumns()); @@ -101,7 +107,9 @@ public class JsonInputFormatTest final JsonInputFormat format = new JsonInputFormat( new JSONPathSpec(true, null), null, - false + false, + null, + null ); Assert.assertFalse(format.isKeepNullColumns()); } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java index c61ad4921bb..18c00578e29 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java @@ -56,6 +56,8 @@ public class JsonLineReaderTest ) ), null, + null, + null, null ); @@ -106,6 +108,8 @@ public class JsonLineReaderTest ) ), null, + null, + null, null ); @@ -148,7 +152,9 @@ public class JsonLineReaderTest ) ), null, - true + true, + null, + null ); final ByteEntity source = new ByteEntity( @@ -190,7 +196,9 @@ public class JsonLineReaderTest ) ), null, - true + true, + null, + null ); final ByteEntity source = new ByteEntity( @@ -232,7 +240,9 @@ public class JsonLineReaderTest ) ), null, - false + false, + null, + null ); final ByteEntity source = new ByteEntity( diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonNodeReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonNodeReaderTest.java new file mode 100644 index 00000000000..50c372ff3d6 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonNodeReaderTest.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; + +public class JsonNodeReaderTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testParseMultipleRows() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used, + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":2}}\n" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + final int numExpectedIterations = 3; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + final String msgId = String.valueOf(++numActualIterations); + Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testParsePrettyFormatJSON() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\n" + + " \"timestamp\": \"2019-01-01\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testInvalidJSONText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}" + //baz property is illegal + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + //expect a ParseException on the following `next` call on iterator + expectedException.expect(ParseException.class); + + // the 2nd line is ill-formed, so the parse of this text chunk aborts + final int numExpectedIterations = 0; + + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + iterator.next(); + ++numActualIterations; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testSampleMultipleRows() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":2}}\n" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + int acturalRowCount = 0; + try (CloseableIterator iterator = reader.sample()) { + while (iterator.hasNext()) { + + final InputRowListPlusRawValues rawValues = iterator.next(); + + // 1 row returned 3 times + Assert.assertEquals(1, rawValues.getInputRows().size()); + InputRow row = rawValues.getInputRows().get(0); + + final String msgId = String.valueOf(++acturalRowCount); + Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + } + } + + Assert.assertEquals(3, acturalRowCount); + } + + @Test + public void testSamplInvalidJSONText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + //2nd row is has an invalid timestamp which causes a parse exception, but is valid JSON + //3rd row is malformed json and terminates the row iteration + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"invalidtimestamp\",\"bar\":null,\"foo\":\"x\",\"baz\":5,\"o\":{\"mg\":2}}\n" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n" + //value of baz is invalid + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + // the invalid timestamp in line 2 causes a parse exception, but because it is valid JSON, parsing can continue + // the invalid character in line 3 stops parsing of the 4-line text as a whole + // so the total num of iteration is 3 + final int numExpectedIterations = 3; + + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + numActualIterations++; + + final InputRowListPlusRawValues rawValues = iterator.next(); + + if (numActualIterations == 2 || numActualIterations == 3) { + Assert.assertNotNull(rawValues.getParseException()); + } + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testEmptyJSONText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + //input is empty + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8( + "" // empty row + ) + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + //expect a ParseException on the following `next` call on iterator + expectedException.expect(ParseException.class); + + // the 2nd line is ill-formed, so the parse of this text chunk aborts + final int numExpectedIterations = 0; + + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + iterator.next(); + ++numActualIterations; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + + + @Test + public void testSampleEmptyText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + //input is empty + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + // the total num of iteration is 1 + final int numExpectedIterations = 1; + + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + numActualIterations++; + + final InputRowListPlusRawValues rawValues = iterator.next(); + + Assert.assertNotNull(rawValues.getParseException()); + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java index 7ab52a095d5..3374844ecfc 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java @@ -62,7 +62,9 @@ public class JsonReaderTest ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -122,7 +124,9 @@ public class JsonReaderTest ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -189,7 +193,9 @@ public class JsonReaderTest ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -243,7 +249,9 @@ public class JsonReaderTest ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -309,7 +317,9 @@ public class JsonReaderTest ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); //2nd row is ill-formed @@ -365,7 +375,9 @@ public class JsonReaderTest ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); //input is empty @@ -421,7 +433,9 @@ public class JsonReaderTest ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); //input is empty diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index e1b59faa58d..22c10276472 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -93,6 +93,13 @@ Configure the JSON `inputFormat` to load JSON data as follows: | flattenSpec | JSON Object | Specifies flattening configuration for nested JSON data. See [`flattenSpec`](#flattenspec) for more info. | no | | featureSpec | JSON Object | [JSON parser features](https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features) supported by Jackson, a JSON processor for Java. The features control parsing of the input JSON data. To enable a feature, map the feature name to a Boolean value of "true". For example: `"featureSpec": {"ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true}` | no | +The following properties are specialized properties that only apply when the JSON `inputFormat` is used in streaming ingestion, and they are related to how parsing exceptions are handled. In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON event spans multiple lines). However, if a parsing exception occurs, all JSON events that are present in the same streaming record will be discarded. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| assumeNewlineDelimited | Boolean | If the input is known to be newline delimited JSON (each individual JSON event is contained in a single line, separated by newlines), setting this option to true allows for more flexible parsing exception handling. Only the lines with invalid JSON syntax will be discarded, while lines containing valid JSON events will still be ingested. | no (Default false) | +| useJsonNodeReader | Boolean | When ingesting multi-line JSON events, enabling this option will enable the use of a JSON parser which will retain any valid JSON events encountered within a streaming record prior to when a parsing exception occurred. | no (Default false) | + For example: ```json "ioConfig": { diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java index 6a677325a33..817167df38a 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java +++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java @@ -327,7 +327,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -353,7 +353,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -380,7 +380,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null) ); @@ -410,7 +410,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals( @@ -444,7 +444,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest ); inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ).collect(Collectors.toList()); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index dba662a9119..caebb4229e8 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -159,7 +159,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); @@ -178,7 +178,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); @@ -229,7 +229,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -251,7 +251,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index 86194d8b098..fe0b89e996f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -91,7 +91,8 @@ public class KafkaInputFormatTest // Key Format new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), // Value Format new JsonInputFormat( @@ -106,7 +107,8 @@ public class KafkaInputFormatTest new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" ); @@ -121,7 +123,8 @@ public class KafkaInputFormatTest // Key Format new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), // Value Format new JsonInputFormat( @@ -136,7 +139,8 @@ public class KafkaInputFormatTest new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" ); @@ -407,7 +411,8 @@ public class KafkaInputFormatTest new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), "kafka.newheader.", "kafka.newkey.", "kafka.newts." ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 24aa2a7dd43..89b824778d8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -130,7 +130,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, new KafkaSupervisorIOConfig( TOPIC, - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, null, @@ -303,7 +303,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, new KafkaSupervisorIOConfig( TOPIC, - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 0e01702240b..9009fd80d3b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -133,6 +133,8 @@ public class KafkaSupervisorTest extends EasyMockSupport private static final InputFormat INPUT_FORMAT = new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), + false, + false, false ); private static final String TOPIC_PREFIX = "testTopic"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 9a446f5265e..9e7d76edddb 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -135,7 +135,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport null, new KinesisSupervisorIOConfig( STREAM, - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), null, null, null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 42866c23107..99d8d5223b7 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -122,6 +122,9 @@ public class KinesisSupervisorTest extends EasyMockSupport private static final InputFormat INPUT_FORMAT = new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), + false, + false, + false, false ); private static final String DATASOURCE = "testDS"; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 2bd59a7f9cd..fc715d3544b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -229,7 +229,7 @@ public class DataSourcePlan return forExternal( new ExternalDataSource( dataString.isEmpty() ? NilInputSource.instance() : new InlineInputSource(dataString), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), signature ), broadcast diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 12b353268ad..9510345ab1f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -625,7 +625,7 @@ public class MSQSelectTest extends MSQTestBase .setDataSource( new ExternalDataSource( new LocalInputSource(null, null, ImmutableList.of(toRead.getAbsoluteFile())), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), RowSignature.builder() .add("timestamp", ColumnType.STRING) .add("page", ColumnType.STRING) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java index e25867b059b..c41786ce3f2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java @@ -82,7 +82,7 @@ public class MSQWarningsTest extends MSQTestBase toRead.getAbsoluteFile() ) ), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), RowSignature.builder() .add("timestamp", ColumnType.STRING) .add("page", ColumnType.STRING) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java index 186cb0b9082..23728beb44d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java @@ -49,7 +49,7 @@ import java.util.stream.Stream; public class ExternalInputSpecSlicerTest { - static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null); + static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null, null, null); static final RowSignature SIGNATURE = RowSignature.builder().add("s", ColumnType.STRING).build(); private ExternalInputSpecSlicer slicer; diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index afca41d61f7..0836f5a5daa 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -663,7 +663,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -688,7 +688,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -713,7 +713,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -738,7 +738,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -768,7 +768,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -799,7 +799,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -830,7 +830,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null) ); @@ -864,7 +864,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals( @@ -902,7 +902,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ).collect(Collectors.toList()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 125e8f1be80..095630a658c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1604,7 +1604,7 @@ public class IndexTaskTest extends IngestionTestBase tmpDir, timestampSpec, dimensionsSpec, - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), null, null, tuningConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java index 2253d9ac334..22a006bd6c2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java @@ -80,7 +80,7 @@ public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiP private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")) ); - private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null); + private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null, null, null); private static final List INTERVAL_TO_INDEX = Collections.singletonList(Intervals.of("2022-01/P1M")); @Parameterized.Parameters @@ -194,6 +194,8 @@ public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiP new JsonInputFormat( new JSONPathSpec(true, null), null, + null, + null, null ), false, @@ -259,6 +261,8 @@ public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiP ) ), null, + null, + null, null ), false, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 21291455656..c91c9772e12 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -227,7 +227,7 @@ public class ParallelIndexSupervisorTaskTest final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( null, new InlineInputSource("test"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), appendToExisting, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 150d0b2c180..7f315ab3304 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -289,6 +289,6 @@ class ParallelIndexTestingFactory static InputFormat getInputFormat() { - return new JsonInputFormat(null, null, null); + return new JsonInputFormat(null, null, null, null, null); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java index 48abc95e08d..749921fff3c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java @@ -47,7 +47,7 @@ public class PartialHashSegmentGenerateTaskTest private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper(); private static final ParallelIndexIngestionSpec INGESTION_SPEC = ParallelIndexTestingFactory.createIngestionSpec( new LocalInputSource(new File("baseDir"), "filer"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), new ParallelIndexTestingFactory.TuningConfigBuilder().build(), ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS) ); @@ -161,7 +161,7 @@ public class PartialHashSegmentGenerateTaskTest ParallelIndexTestingFactory.NUM_ATTEMPTS, ParallelIndexTestingFactory.createIngestionSpec( new LocalInputSource(new File("baseDir"), "filer"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), new ParallelIndexTestingFactory.TuningConfigBuilder().build(), ParallelIndexTestingFactory.createDataSchema(null) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 031dd68d4cd..e1907c9dcab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -851,6 +851,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv new JsonInputFormat( new JSONPathSpec(true, null), null, + null, + null, null ), false, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java index c28d43c3858..d42f7e69200 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java @@ -54,7 +54,7 @@ public class SinglePhaseSubTaskSpecTest new ParallelIndexIOConfig( null, new LocalInputSource(new File("baseDir"), "filter"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), null, null ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index 50e2f63c0f8..af612572b5b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -1463,7 +1463,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest { switch (parserType) { case STR_JSON: - return new JsonInputFormat(null, null, null); + return new JsonInputFormat(null, null, null, null, null); case STR_CSV: return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, null, false, 0); default: diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 92bb04d6cb3..e93510ad740 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -128,6 +128,8 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport protected static final InputFormat INPUT_FORMAT = new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), + null, + null, null ); protected static final Logger LOG = new Logger(SeekableStreamIndexTaskTestBase.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 7cdbb7481af..7116b881374 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -856,7 +856,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport { SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), @@ -927,7 +927,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport if (scaleOut) { return new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, taskCount, new Period("PT1H"), @@ -944,7 +944,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport } else { return new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, taskCount, new Period("PT1H"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java index 1cab704a2ef..f3485e06f48 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java @@ -106,7 +106,7 @@ public class StreamChunkParserTest @Test public void testWithNullParserAndInputformatParseProperly() throws IOException { - final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null); + final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null, null, null); final StreamChunkParser chunkParser = new StreamChunkParser<>( null, inputFormat, @@ -237,7 +237,7 @@ public class StreamChunkParserTest private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec, @Nullable Map featureSpec) { - super(flattenSpec, featureSpec, null); + super(flattenSpec, featureSpec, null, null, null); props = new Props(); } @@ -246,7 +246,7 @@ public class StreamChunkParserTest boolean lineSplittable, Props props) { - super(flattenSpec, featureSpec, null, lineSplittable); + super(flattenSpec, featureSpec, null, lineSplittable, null, null); this.props = props; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 5566a5e95d2..8d56b757df2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -897,7 +897,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), @@ -955,7 +955,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport { return new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), diff --git a/website/.spelling b/website/.spelling index 1440b89421f..0b13916f030 100644 --- a/website/.spelling +++ b/website/.spelling @@ -216,6 +216,7 @@ aggregators ambari analytics arrayElement +assumeNewlineDelimited assumeRoleArn assumeRoleExternalId async @@ -489,6 +490,7 @@ unsetting untrusted useFilterCNF useJqSyntax +useJsonNodeReader useSSL uptime uris