Fix streaming ingestion fails if it encounters empty rows (Regression) (#10962)

* Fix streaming ingestion fails and halt if it  encounters empty rows

* address comments
This commit is contained in:
Maytas Monsereenusorn 2021-03-09 12:11:58 -08:00 committed by GitHub
parent 80ec28578a
commit 4dd22a850b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 71 additions and 9 deletions

View File

@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.util.List;
@ -90,11 +91,12 @@ public class JsonReader extends IntermediateRowParsingReader<String>
@Override
protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
{
final List<InputRow> inputRows;
try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
final MappingIterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
return FluentIterable.from(() -> delegate)
.transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
.toList();
inputRows = FluentIterable.from(() -> delegate)
.transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
.toList();
}
catch (RuntimeException e) {
//convert Jackson's JsonParseException into druid's exception for further processing
@ -106,6 +108,10 @@ public class JsonReader extends IntermediateRowParsingReader<String>
//throw unknown exception
throw e;
}
if (CollectionUtils.isNullOrEmpty(inputRows)) {
throw new ParseException("Unable to parse [%s] as the intermediateRow resulted in empty input row", intermediateRow);
}
return inputRows;
}
@Override

View File

@ -346,6 +346,62 @@ public class JsonReaderTest
}
}
@Test
public void testEmptyJSONText() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false //make sure JsonReader is used
);
//input is empty
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8(
"" // empty row
)
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
),
source,
null
);
//expect a ParseException on the following `next` call on iterator
expectedException.expect(ParseException.class);
// the 2nd line is ill-formed, so the parse of this text chunk aborts
final int numExpectedIterations = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
iterator.next();
++numActualIterations;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testSampleEmptyText() throws IOException
{

View File

@ -253,7 +253,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
new ProducerRecord<byte[], byte[]>(topic, 0, null, jbb("2011", "e", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS),
new ProducerRecord<>(topic, 0, null, jbb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("")),
new ProducerRecord<>(topic, 0, null, null),
new ProducerRecord<>(topic, 0, null, jbb("2013", "f", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "notanumber", "20.0", "1.0")),
@ -1491,7 +1491,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=20.0, met1=notanumber}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [Unable to parse value[notanumber] for field[met1]]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long]",
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]",
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
)
@ -1560,7 +1560,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Map<String, Object> unparseableEvents = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]"
)
);

View File

@ -289,7 +289,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
stream,
"1",
"7",
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable2")))
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("")))
),
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))),
new OrderedPartitionableRecord<>(stream, "1", "9", jbl("2013", "f", "y", "10", "20.0", "1.0")),
@ -1347,7 +1347,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long]",
"Timestamp[null] is unparseable! Event: {}",
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]",
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
)
@ -1434,7 +1434,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
Map<String, Object> unparseableEvents = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]"
)
);