Clear "lineSplittable" for JSON when using KafkaInputFormat. (#15692)

* Clear "lineSplittable" for JSON when using KafkaInputFormat.

JsonInputFormat has a "withLineSplittable" method that can be used to
control whether JSON is read line-by-line, or as a whole. The intent
is that in streaming ingestion, "lineSplittable" is false (although it
can be overridden by "assumeNewlineDelimited"), and in batch ingestion,
lineSplittable is true.

When a "json" format is wrapped by a "kafka" format, this isn't set
properly. This patch updates KafkaInputFormat to set this on an
underlying "json" format.

The tests for KafkaInputFormat were overriding the "lineSplittable"
parameter explicitly, which wasn't really fair, because that made them
unrealistic to what happens in production. Now they omit the parameter
and get the production behavior.

* Add test.

* Fix test coverage.
This commit is contained in:
Gian Merlino 2024-01-18 03:22:41 -08:00 committed by GitHub
parent d3d0c1c91e
commit 764f41d959
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 90 additions and 11 deletions

View File

@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
@ -109,12 +110,12 @@ public class KafkaInputFormat implements InputFormat
record ->
(record.getRecord().key() == null) ?
null :
keyFormat.createReader(
JsonInputFormat.withLineSplittable(keyFormat, false).createReader(
newInputRowSchema,
new ByteEntity(record.getRecord().key()),
temporaryDirectory
),
valueFormat.createReader(
JsonInputFormat.withLineSplittable(valueFormat, false).createReader(
newInputRowSchema,
source,
temporaryDirectory

View File

@ -102,7 +102,6 @@ public class KafkaInputFormatTest
new JSONPathSpec(true, ImmutableList.of()),
null,
null,
false, //make sure JsonReader is used
false,
false
),
@ -121,7 +120,6 @@ public class KafkaInputFormatTest
),
null,
null,
false, //make sure JsonReader is used
false,
false
),
@ -143,7 +141,6 @@ public class KafkaInputFormatTest
new JSONPathSpec(true, ImmutableList.of()),
null,
null,
false, //make sure JsonReader is used
false,
false
),
@ -162,7 +159,6 @@ public class KafkaInputFormatTest
),
null,
null,
false, //make sure JsonReader is used
false,
false
),
@ -485,8 +481,10 @@ public class KafkaInputFormatTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null, null, false, //make sure JsonReader is used
false, false
null,
null,
false,
false
),
"kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic."
);

View File

@ -125,7 +125,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
ImmutableMap.of(),
false,
false,
false,
false
);
private static final String DATASOURCE = "testDS";

View File

@ -121,7 +121,7 @@ public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, Reco
@Nullable File temporaryDirectory
)
{
InputFormat format = inputFormat instanceof JsonInputFormat ? ((JsonInputFormat) inputFormat).withLineSplittable(false) : inputFormat;
InputFormat format = JsonInputFormat.withLineSplittable(inputFormat, false);
return new InputEntityIteratingReader(
inputRowSchema,
format,

View File

@ -51,7 +51,7 @@ class SettableByteEntityReader<T extends ByteEntity> implements InputEntityReade
)
{
Preconditions.checkNotNull(inputFormat, "inputFormat");
final InputFormat format = (inputFormat instanceof JsonInputFormat) ? ((JsonInputFormat) inputFormat).withLineSplittable(false) : inputFormat;
final InputFormat format = JsonInputFormat.withLineSplittable(inputFormat, false);
this.entity = new SettableByteEntity<>();
this.delegate = new TransformingInputEntityReader(
format.createReader(inputRowSchema, entity, indexingTmpDir),

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -121,6 +122,11 @@ public class JsonInputFormat extends NestedInputFormat
return featureSpec;
}
boolean isLineSplittable()
{
return lineSplittable;
}
@JsonProperty // No @JsonInclude, since default is variable, so we can't assume false is default
public boolean isKeepNullColumns()
{
@ -227,4 +233,19 @@ public class JsonInputFormat extends NestedInputFormat
", useJsonNodeReader=" + useJsonNodeReader +
'}';
}
/**
* If the provided format is {@link JsonInputFormat}, return a version with {@link #withLineSplittable(boolean)}
* called. Otherwise return the provided format itself. This is a hack in order to get the same "json" input format
* to use {@link JsonReader} by default for streaming ingestion, and {@link JsonLineReader} by default for batch
* file-based ingestion.
*/
public static InputFormat withLineSplittable(InputFormat format, boolean lineSplittable)
{
if (format instanceof JsonInputFormat) {
return ((JsonInputFormat) format).withLineSplittable(lineSplittable);
} else {
return format;
}
}
}

View File

@ -29,6 +29,8 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.utils.CompressionUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
@ -65,6 +67,64 @@ public class JsonInputFormatTest
Assert.assertEquals(format, fromJson);
}
@Test
public void testWithLineSplittable()
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg", null, Arrays.asList("o", "mg")),
new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg2", null, Arrays.asList("o", "mg2"))
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
true,
false,
false
);
Assert.assertTrue(format.isLineSplittable());
Assert.assertFalse(format.withLineSplittable(false).isLineSplittable());
}
@Test
public void testWithLineSplittableStatic()
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg", null, Arrays.asList("o", "mg")),
new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg2", null, Arrays.asList("o", "mg2"))
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
true,
false,
false
);
Assert.assertTrue(format.isLineSplittable());
Assert.assertFalse(((JsonInputFormat) JsonInputFormat.withLineSplittable(format, false)).isLineSplittable());
// Other formats than json are passed-through unchanged
final InputFormat noopInputFormat = JsonInputFormat.withLineSplittable(new NoopInputFormat(), false);
MatcherAssert.assertThat(noopInputFormat, CoreMatchers.instanceOf(NoopInputFormat.class));
}
@Test
public void testEquals()
{