Fix NPE in test parse exception report. Add more tests with different thresholds. (#14209)

This commit is contained in:
Abhishek Radhakrishnan 2023-05-05 10:05:41 -07:00 committed by GitHub
parent 01e88848ce
commit 46dabab36d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 133 additions and 5 deletions

View File

@ -3072,6 +3072,133 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
}
@Test(timeout = 60_000L)
public void testNoParseExceptionsTaskSucceeds() throws Exception
{
reportParseExceptions = false;
maxParseExceptions = 0;
maxSavedParseExceptions = 0;
// Prepare records and insert data
records = Arrays.asList(
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2049", "a", "y", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2049", "b", "y", "10", "20.0", "1.0"),
SAMPLE_HEADERS)
);
insertData();
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null
)
);
Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 2)).unparseable(0).totalProcessed(2));
// Check published metadata
assertEqualsExceptVersion(
ImmutableList.of(
// 2 rows at last in druid
sdd("2049/P1D", 0, ImmutableList.of("a", "b"))
),
publishedDescriptors()
);
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 2L))),
newDataSchemaMetadata()
);
// Verify there is no unparseable data in the report since we've no parse exceptions and no saved parse exceptions
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS);
Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages());
}
@Test(timeout = 60_000L)
public void testParseExceptionsBeyondThresholdTaskFails() throws Exception
{
reportParseExceptions = false;
maxParseExceptions = 1;
maxSavedParseExceptions = 0;
// Prepare records and insert data
records = Arrays.asList(
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2049", "a", "y", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("200", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y1", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2009", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y2", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2049", "b", "y", "10", "21.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("200", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y3", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2049", "c", "y", "10", "21.0", "1.0"),
SAMPLE_HEADERS)
);
insertData();
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 6L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null
)
);
Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit. Should fail and trip up with the first two bad messages in the stream
Assert.assertEquals(TaskState.FAILED, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 3)).unparseable(2).totalProcessed(1));
// Check there's no published metadata since the task failed
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());
// Verify there is no unparseable data in the report since we've 0 saved parse exceptions
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS);
Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages());
}
public static class TestKafkaInputFormat implements InputFormat
{
final InputFormat baseInputFormat;

View File

@ -47,13 +47,14 @@ public class ParseExceptionReport
{
List<LinkedHashMap<String, Object>> events =
(List<LinkedHashMap<String, Object>>) reportData.getUnparseableEvents().get(phase);
final List<String> inputs = new ArrayList<>();
final List<String> errorMessages = new ArrayList<>();
events.forEach(event -> {
inputs.add((String) event.get("input"));
errorMessages.add(((List<String>) event.get("details")).get(0));
});
if (events != null) {
events.forEach(event -> {
inputs.add((String) event.get("input"));
errorMessages.add(((List<String>) event.get("details")).get(0));
});
}
return new ParseExceptionReport(inputs, errorMessages);
}