From a83125e4a00c56563709e664a4e9685a834c7ec8 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 21 Aug 2024 23:13:46 -0700 Subject: [PATCH] Track IngestionState more accurately in realtime tasks. (#16934) Previously, SeekableStreamIndexTaskRunner set ingestion state to COMPLETED when it finished reading data from Kafka. This is incorrect. After the changes in this patch, the transitions go: 1) The task stays in BUILD_SEGMENTS after it finishes reading from Kafka, while it is building its final set of segments to publish. 2) The task transitions to SEGMENT_AVAILABILITY_WAIT after publishing, while waiting for handoff. 3) The task transitions to COMPLETED immediately before exiting, when truly done. --- docs/ingestion/tasks.md | 1 + .../indexing/kafka/KafkaIndexTaskTest.java | 34 +++++++++++++++++-- .../kinesis/KinesisIndexTaskTest.java | 9 +++++ .../SeekableStreamIndexTaskRunner.java | 3 +- 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 3291abb01a0..743a46cc854 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -239,6 +239,7 @@ The `ingestionState` shows what step of ingestion the task reached. Possible sta - `NOT_STARTED`: The task has not begun reading any rows - `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning - `BUILD_SEGMENTS`: The task is processing rows to construct segments +- `SEGMENT_AVAILABILITY_WAIT`: The task has published its segments and is waiting for them to become available. - `COMPLETED`: The task has finished its work. Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS phase. diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 15b77be307d..f8c6b23aae9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -52,6 +52,7 @@ import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.data.input.kafka.KafkaTopicPartition; import org.apache.druid.data.input.kafkainput.KafkaInputFormat; import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.report.IngestionStatsAndErrors; @@ -1617,6 +1618,10 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1697,6 +1702,10 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); + Assert.assertNotNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -3057,9 +3066,13 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase newDataSchemaMetadata() ); - // Verify unparseable data IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + + // Verify unparseable data ParseExceptionReport parseExceptionReport = ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); @@ -3190,9 +3203,14 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(ImmutableList.of(), publishedDescriptors()); Assert.assertNull(newDataSchemaMetadata()); + // Verify ingestion state and error message + final IngestionStatsAndErrors reportData = getTaskReportData(); + Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); + Assert.assertNotNull(reportData.getErrorMsg()); + // Verify there is no unparseable data in the report since we've 0 saved parse exceptions ParseExceptionReport parseExceptionReport = - ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS); + ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages()); } @@ -3231,6 +3249,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); IngestionStatsAndErrors reportData = getTaskReportData(); + + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + + // Verify report metrics Assert.assertEquals(reportData.getRecordsProcessed().size(), 1); Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L); } @@ -3279,6 +3303,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); IngestionStatsAndErrors reportData = getTaskReportData(); + + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + + // Verify report metrics Assert.assertEquals(reportData.getRecordsProcessed().size(), 2); Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L))); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 80bded2031d..510eaa797e0 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -43,6 +43,7 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.kinesis.KinesisRecordEntity; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.report.IngestionStatsAndErrors; @@ -1186,6 +1187,10 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1272,6 +1277,10 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); + Assert.assertNotNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index d347fd81503..df8e220145b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -768,7 +768,6 @@ public abstract class SeekableStreamIndexTaskRunner handedOffList = Collections.emptyList(); + ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOffList = Futures.allAsList(handOffWaitList).get(); } else { @@ -928,6 +928,7 @@ public abstract class SeekableStreamIndexTaskRunner