Track IngestionState more accurately in realtime tasks. (#16934)

Previously, SeekableStreamIndexTaskRunner set ingestion state to
COMPLETED when it finished reading data from Kafka. This is incorrect.
After the changes in this patch, the transitions go:

1) The task stays in BUILD_SEGMENTS after it finishes reading from Kafka,
   while it is building its final set of segments to publish.

2) The task transitions to SEGMENT_AVAILABILITY_WAIT after publishing,
   while waiting for handoff.

3) The task transitions to COMPLETED immediately before exiting, when
   truly done.
This commit is contained in:
Gian Merlino 2024-08-21 23:13:46 -07:00 committed by GitHub
parent 725695342c
commit a83125e4a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 44 additions and 3 deletions

View File

@ -239,6 +239,7 @@ The `ingestionState` shows what step of ingestion the task reached. Possible sta
- `NOT_STARTED`: The task has not begun reading any rows - `NOT_STARTED`: The task has not begun reading any rows
- `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning - `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning
- `BUILD_SEGMENTS`: The task is processing rows to construct segments - `BUILD_SEGMENTS`: The task is processing rows to construct segments
- `SEGMENT_AVAILABILITY_WAIT`: The task has published its segments and is waiting for them to become available.
- `COMPLETED`: The task has finished its work. - `COMPLETED`: The task has finished its work.
Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS phase. Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS phase.

View File

@ -52,6 +52,7 @@ import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafka.KafkaTopicPartition; import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.data.input.kafkainput.KafkaInputFormat; import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat; import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexer.report.IngestionStatsAndErrors;
@ -1617,6 +1618,10 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData(); IngestionStatsAndErrors reportData = getTaskReportData();
// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());
Map<String, Object> expectedMetrics = ImmutableMap.of( Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS, RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of( ImmutableMap.of(
@ -1697,6 +1702,10 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData(); IngestionStatsAndErrors reportData = getTaskReportData();
// Verify ingestion state and error message
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());
Map<String, Object> expectedMetrics = ImmutableMap.of( Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS, RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of( ImmutableMap.of(
@ -3057,9 +3066,13 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
newDataSchemaMetadata() newDataSchemaMetadata()
); );
// Verify unparseable data
IngestionStatsAndErrors reportData = getTaskReportData(); IngestionStatsAndErrors reportData = getTaskReportData();
// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());
// Verify unparseable data
ParseExceptionReport parseExceptionReport = ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
@ -3190,9 +3203,14 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(ImmutableList.of(), publishedDescriptors()); Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata()); Assert.assertNull(newDataSchemaMetadata());
// Verify ingestion state and error message
final IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());
// Verify there is no unparseable data in the report since we've 0 saved parse exceptions // Verify there is no unparseable data in the report since we've 0 saved parse exceptions
ParseExceptionReport parseExceptionReport = ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS); ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages()); Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages());
} }
@ -3231,6 +3249,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData(); IngestionStatsAndErrors reportData = getTaskReportData();
// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());
// Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1); Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L); Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L);
} }
@ -3279,6 +3303,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData(); IngestionStatsAndErrors reportData = getTaskReportData();
// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());
// Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2); Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L))); Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L)));
} }

View File

@ -43,6 +43,7 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexer.report.IngestionStatsAndErrors;
@ -1186,6 +1187,10 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData(); IngestionStatsAndErrors reportData = getTaskReportData();
// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());
Map<String, Object> expectedMetrics = ImmutableMap.of( Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS, RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of( ImmutableMap.of(
@ -1272,6 +1277,10 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData(); IngestionStatsAndErrors reportData = getTaskReportData();
// Verify ingestion state and error message
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());
Map<String, Object> expectedMetrics = ImmutableMap.of( Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS, RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of( ImmutableMap.of(

View File

@ -768,7 +768,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} }
} }
} }
ingestionState = IngestionState.COMPLETED;
} }
catch (Exception e) { catch (Exception e) {
// (1) catch all exceptions while reading from kafka // (1) catch all exceptions while reading from kafka
@ -835,6 +834,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// failed to persist sequences. It might also return null if handoff failed, but was recoverable. // failed to persist sequences. It might also return null if handoff failed, but was recoverable.
// See publishAndRegisterHandoff() for details. // See publishAndRegisterHandoff() for details.
List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList(); List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
if (tuningConfig.getHandoffConditionTimeout() == 0) { if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOffList = Futures.allAsList(handOffWaitList).get(); handedOffList = Futures.allAsList(handOffWaitList).get();
} else { } else {
@ -928,6 +928,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} }
} }
ingestionState = IngestionState.COMPLETED;
toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs)); toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs));
return TaskStatus.success(task.getId()); return TaskStatus.success(task.getId());
} }