From 4dd22a850b2e5acc8d9732513303fe6e3560d3d0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 9 Mar 2021 12:11:58 -0800 Subject: [PATCH] Fix streaming ingestion fails if it encounters empty rows (Regression) (#10962) * Fix streaming ingestion fails and halt if it encounters empty rows * address comments --- .../druid/data/input/impl/JsonReader.java | 12 +++- .../druid/data/input/impl/JsonReaderTest.java | 56 +++++++++++++++++++ .../indexing/kafka/KafkaIndexTaskTest.java | 6 +- .../kinesis/KinesisIndexTaskTest.java | 6 +- 4 files changed, 71 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java index 4abeb0da340..34b059299bc 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java @@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.CollectionUtils; import java.io.IOException; import java.util.List; @@ -90,11 +91,12 @@ public class JsonReader extends IntermediateRowParsingReader @Override protected List parseInputRows(String intermediateRow) throws IOException, ParseException { + final List inputRows; try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) { final MappingIterator delegate = mapper.readValues(parser, JsonNode.class); - return FluentIterable.from(() -> delegate) - .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode))) - .toList(); + inputRows = FluentIterable.from(() -> delegate) + .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode))) + .toList(); } catch (RuntimeException e) { //convert Jackson's JsonParseException into druid's exception for further processing @@ -106,6 +108,10 @@ public class JsonReader extends IntermediateRowParsingReader //throw unknown exception throw e; } + if (CollectionUtils.isNullOrEmpty(inputRows)) { + throw new ParseException("Unable to parse [%s] as the intermediateRow resulted in empty input row", intermediateRow); + } + return inputRows; } @Override diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java index a519ee01c34..f554034469f 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java @@ -346,6 +346,62 @@ public class JsonReaderTest } } + @Test + public void testEmptyJSONText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false //make sure JsonReader is used + ); + + //input is empty + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8( + "" // empty row + ) + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + Collections.emptyList() + ), + source, + null + ); + + //expect a ParseException on the following `next` call on iterator + expectedException.expect(ParseException.class); + + // the 2nd line is ill-formed, so the parse of this text chunk aborts + final int numExpectedIterations = 0; + + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + iterator.next(); + ++numActualIterations; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + + @Test public void testSampleEmptyText() throws IOException { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index caf6ea8fd6f..992bb19b626 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -253,7 +253,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase new ProducerRecord(topic, 0, null, jbb("2011", "e", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS), new ProducerRecord<>(topic, 0, null, jbb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), - new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("")), new ProducerRecord<>(topic, 0, null, null), new ProducerRecord<>(topic, 0, null, jbb("2013", "f", "y", "10", "20.0", "1.0")), new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "notanumber", "20.0", "1.0")), @@ -1491,7 +1491,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=20.0, met1=notanumber}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [Unable to parse value[notanumber] for field[met1]]", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float]", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long]", - "Unable to parse row [unparseable2]", + "Unable to parse [] as the intermediateRow resulted in empty input row", "Unable to parse row [unparseable]", "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]" ) @@ -1560,7 +1560,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Map unparseableEvents = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( - "Unable to parse row [unparseable2]", + "Unable to parse [] as the intermediateRow resulted in empty input row", "Unable to parse row [unparseable]" ) ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index f6a8542420f..745375d96ba 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -289,7 +289,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase stream, "1", "7", - Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable2"))) + Collections.singletonList(new ByteEntity(StringUtils.toUtf8(""))) ), new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))), new OrderedPartitionableRecord<>(stream, "1", "9", jbl("2013", "f", "y", "10", "20.0", "1.0")), @@ -1347,7 +1347,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float]", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long]", "Timestamp[null] is unparseable! Event: {}", - "Unable to parse row [unparseable2]", + "Unable to parse [] as the intermediateRow resulted in empty input row", "Unable to parse row [unparseable]", "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]" ) @@ -1434,7 +1434,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase Map unparseableEvents = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( - "Unable to parse row [unparseable2]", + "Unable to parse [] as the intermediateRow resulted in empty input row", "Unable to parse row [unparseable]" ) );