From 46dabab36dc86d307f3c96bee5b833dfe7be124e Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Fri, 5 May 2023 10:05:41 -0700 Subject: [PATCH] Fix NPE in test parse exception report. Add more tests with different thresholds. (#14209) --- .../indexing/kafka/KafkaIndexTaskTest.java | 127 ++++++++++++++++++ .../common/task/ParseExceptionReport.java | 11 +- 2 files changed, 133 insertions(+), 5 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 47c04d18a3a..bd4f55911e0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -3072,6 +3072,133 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages()); } + @Test(timeout = 60_000L) + public void testNoParseExceptionsTaskSucceeds() throws Exception + { + reportParseExceptions = false; + maxParseExceptions = 0; + maxSavedParseExceptions = 0; + + // Prepare records and insert data + records = Arrays.asList( + new ProducerRecord(topic, 0, null, + jbb("2049", "a", "y", "10", "20.0", "1.0"), + SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, + jbb("2049", "b", "y", "10", "20.0", "1.0"), + SAMPLE_HEADERS) + ); + insertData(); + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 2L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), + null + ) + ); + + Assert.assertTrue(task.supportsQueries()); + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 2)).unparseable(0).totalProcessed(2)); + + // Check published metadata + assertEqualsExceptVersion( + ImmutableList.of( + // 2 rows at last in druid + sdd("2049/P1D", 0, ImmutableList.of("a", "b")) + ), + publishedDescriptors() + ); + Assert.assertEquals( + new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 2L))), + newDataSchemaMetadata() + ); + + // Verify there is no unparseable data in the report since we've no parse exceptions and no saved parse exceptions + ParseExceptionReport parseExceptionReport = + ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS); + + Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages()); + } + + @Test(timeout = 60_000L) + public void testParseExceptionsBeyondThresholdTaskFails() throws Exception + { + reportParseExceptions = false; + maxParseExceptions = 1; + maxSavedParseExceptions = 0; + + // Prepare records and insert data + records = Arrays.asList( + new ProducerRecord(topic, 0, null, + jbb("2049", "a", "y", "10", "20.0", "1.0"), + SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, + jbb("200", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y1", "10", "20.0", "1.0"), + SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, + jbb("2009", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y2", "10", "20.0", "1.0"), + SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, + jbb("2049", "b", "y", "10", "21.0", "1.0"), + SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, + jbb("200", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y3", "10", "20.0", "1.0"), + SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, + jbb("2049", "c", "y", "10", "21.0", "1.0"), + SAMPLE_HEADERS) + ); + insertData(); + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 6L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), + null + ) + ); + + Assert.assertTrue(task.supportsQueries()); + final ListenableFuture future = runTask(task); + + // Wait for task to exit. Should fail and trip up with the first two bad messages in the stream + Assert.assertEquals(TaskState.FAILED, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 3)).unparseable(2).totalProcessed(1)); + + // Check there's no published metadata since the task failed + Assert.assertEquals(ImmutableList.of(), publishedDescriptors()); + Assert.assertNull(newDataSchemaMetadata()); + + // Verify there is no unparseable data in the report since we've 0 saved parse exceptions + ParseExceptionReport parseExceptionReport = + ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS); + + Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages()); + } + public static class TestKafkaInputFormat implements InputFormat { final InputFormat baseInputFormat; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java index cd5efbd88bc..bc7148c5401 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java @@ -47,13 +47,14 @@ public class ParseExceptionReport { List> events = (List>) reportData.getUnparseableEvents().get(phase); - final List inputs = new ArrayList<>(); final List errorMessages = new ArrayList<>(); - events.forEach(event -> { - inputs.add((String) event.get("input")); - errorMessages.add(((List) event.get("details")).get(0)); - }); + if (events != null) { + events.forEach(event -> { + inputs.add((String) event.get("input")); + errorMessages.add(((List) event.get("details")).get(0)); + }); + } return new ParseExceptionReport(inputs, errorMessages); }