diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index 12996670574..9f4769b8766 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.seekablestream.SettableByteEntity; @@ -109,12 +110,12 @@ public class KafkaInputFormat implements InputFormat record -> (record.getRecord().key() == null) ? null : - keyFormat.createReader( + JsonInputFormat.withLineSplittable(keyFormat, false).createReader( newInputRowSchema, new ByteEntity(record.getRecord().key()), temporaryDirectory ), - valueFormat.createReader( + JsonInputFormat.withLineSplittable(valueFormat, false).createReader( newInputRowSchema, source, temporaryDirectory diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index 21a0550f53e..858cd79fbd7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -102,7 +102,6 @@ public class KafkaInputFormatTest new JSONPathSpec(true, ImmutableList.of()), null, null, - false, //make sure JsonReader is used false, false ), @@ -121,7 +120,6 @@ public class KafkaInputFormatTest ), null, null, - false, //make sure JsonReader is used false, false ), @@ -143,7 +141,6 @@ public class KafkaInputFormatTest new JSONPathSpec(true, ImmutableList.of()), null, null, - false, //make sure JsonReader is used false, false ), @@ -162,7 +159,6 @@ public class KafkaInputFormatTest ), null, null, - false, //make sure JsonReader is used false, false ), @@ -485,8 +481,10 @@ public class KafkaInputFormatTest new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false, //make sure JsonReader is used - false, false + null, + null, + false, + false ), "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic." ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index f38697c8180..c9b6406bd3f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -125,7 +125,6 @@ public class KinesisSupervisorTest extends EasyMockSupport ImmutableMap.of(), false, false, - false, false ); private static final String DATASOURCE = "testDS"; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java index d78f576681e..b9bef3c7314 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java @@ -121,7 +121,7 @@ public class RecordSupplierInputSource implements InputEntityReade ) { Preconditions.checkNotNull(inputFormat, "inputFormat"); - final InputFormat format = (inputFormat instanceof JsonInputFormat) ? ((JsonInputFormat) inputFormat).withLineSplittable(false) : inputFormat; + final InputFormat format = JsonInputFormat.withLineSplittable(inputFormat, false); this.entity = new SettableByteEntity<>(); this.delegate = new TransformingInputEntityReader( format.createReader(inputRowSchema, entity, indexingTmpDir), diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 07f658f99e1..5bab7f6d63e 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -121,6 +122,11 @@ public class JsonInputFormat extends NestedInputFormat return featureSpec; } + boolean isLineSplittable() + { + return lineSplittable; + } + @JsonProperty // No @JsonInclude, since default is variable, so we can't assume false is default public boolean isKeepNullColumns() { @@ -227,4 +233,19 @@ public class JsonInputFormat extends NestedInputFormat ", useJsonNodeReader=" + useJsonNodeReader + '}'; } + + /** + * If the provided format is {@link JsonInputFormat}, return a version with {@link #withLineSplittable(boolean)} + * called. Otherwise return the provided format itself. This is a hack in order to get the same "json" input format + * to use {@link JsonReader} by default for streaming ingestion, and {@link JsonLineReader} by default for batch + * file-based ingestion. + */ + public static InputFormat withLineSplittable(InputFormat format, boolean lineSplittable) + { + if (format instanceof JsonInputFormat) { + return ((JsonInputFormat) format).withLineSplittable(lineSplittable); + } else { + return format; + } + } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java index d2f1474c2aa..17a08eb1dda 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java @@ -29,6 +29,8 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.utils.CompressionUtils; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; @@ -65,6 +67,64 @@ public class JsonInputFormatTest Assert.assertEquals(format, fromJson); } + @Test + public void testWithLineSplittable() + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg", null, Arrays.asList("o", "mg")), + new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg2", null, Arrays.asList("o", "mg2")) + ) + ), + ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false), + true, + false, + false + ); + + Assert.assertTrue(format.isLineSplittable()); + Assert.assertFalse(format.withLineSplittable(false).isLineSplittable()); + } + + @Test + public void testWithLineSplittableStatic() + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg", null, Arrays.asList("o", "mg")), + new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg2", null, Arrays.asList("o", "mg2")) + ) + ), + ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false), + true, + false, + false + ); + + Assert.assertTrue(format.isLineSplittable()); + Assert.assertFalse(((JsonInputFormat) JsonInputFormat.withLineSplittable(format, false)).isLineSplittable()); + + // Other formats than json are passed-through unchanged + final InputFormat noopInputFormat = JsonInputFormat.withLineSplittable(new NoopInputFormat(), false); + MatcherAssert.assertThat(noopInputFormat, CoreMatchers.instanceOf(NoopInputFormat.class)); + } + @Test public void testEquals() {