From 1f443d218cae544d9a2d024fd91c27a9daf1b5b0 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Fri, 23 Feb 2024 04:59:03 -0600 Subject: [PATCH] Enable partition stats on streaming task completion report (#15930) Changes: - Add visibility into number of records processed by each streaming task per partition - Add field `recordsProcessed` to `IngestionStatsAndErrorsTaskReportData` - Populate number of records processed per partition in `SeekableStreamIndexTaskRunner` --- docs/ingestion/tasks.md | 4 + .../indexing/kafka/KafkaIndexTaskTest.java | 86 +++++++++++++++++++ ...IngestionStatsAndErrorsTaskReportData.java | 21 ++++- .../AppenderatorDriverRealtimeIndexTask.java | 3 +- .../indexing/common/task/HadoopIndexTask.java | 3 +- .../druid/indexing/common/task/IndexTask.java | 3 +- .../parallel/ParallelIndexSupervisorTask.java | 3 +- .../parallel/PartialSegmentGenerateTask.java | 3 +- .../batch/parallel/SinglePhaseSubTask.java | 3 +- .../SeekableStreamIndexTaskRunner.java | 11 ++- .../common/task/TaskReportSerdeTest.java | 3 +- 11 files changed, 132 insertions(+), 11 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 855c9824347..af9d8b7f88b 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -83,6 +83,9 @@ An example output is shown below: }, "segmentAvailabilityConfirmed": false, "segmentAvailabilityWaitTimeMs": 0, + "recordsProcessed": { + "partition-a": 5789 + }, "errorMsg": null }, "type": "ingestionStatsAndErrors" @@ -98,6 +101,7 @@ For some task types, the indexing task can wait for the newly ingested segments |---|---| |`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.| |`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.| +|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.| ### Live report diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index b2a9702da5e..c3d1d213383 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -3200,6 +3200,92 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages()); } + @Test(timeout = 60_000L) + public void testCompletionReportPartitionStats() throws Exception + { + insertData(); + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>( + topic, + ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 10L) + ), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + INPUT_FORMAT, + null + ) + ); + + final ListenableFuture future = runTask(task); + TaskStatus status = future.get(); + + Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); + IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + Assert.assertEquals(reportData.getRecordsProcessed().size(), 1); + Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L); + } + + @Test(timeout = 60_000L) + public void testCompletionReportMultiplePartitionStats() throws Exception + { + insertData(); + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>( + topic, + ImmutableMap.of( + new KafkaTopicPartition(false, topic, 0), + 0L, + new KafkaTopicPartition(false, topic, 1), + 0L + ), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + ImmutableMap.of( + new KafkaTopicPartition(false, topic, 0), + 10L, + new KafkaTopicPartition(false, topic, 1), + 2L + ) + ), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + INPUT_FORMAT, + null + ) + ); + + final ListenableFuture future = runTask(task); + TaskStatus status = future.get(); + + Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); + IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + Assert.assertEquals(reportData.getRecordsProcessed().size(), 2); + Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L))); + } + public static class TestKafkaInputFormat implements InputFormat { final InputFormat baseInputFormat; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index 28388770c05..ecdd9d3aeba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -47,13 +47,17 @@ public class IngestionStatsAndErrorsTaskReportData @JsonProperty private long segmentAvailabilityWaitTimeMs; + @JsonProperty + private Map recordsProcessed; + public IngestionStatsAndErrorsTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map unparseableEvents, @JsonProperty("rowStats") Map rowStats, @JsonProperty("errorMsg") @Nullable String errorMsg, @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, - @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs + @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, + @JsonProperty("recordsProcessed") Map recordsProcessed ) { this.ingestionState = ingestionState; @@ -62,6 +66,7 @@ public class IngestionStatsAndErrorsTaskReportData this.errorMsg = errorMsg; this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; + this.recordsProcessed = recordsProcessed; } @JsonProperty @@ -101,6 +106,13 @@ public class IngestionStatsAndErrorsTaskReportData return segmentAvailabilityWaitTimeMs; } + @JsonProperty + @Nullable + public Map getRecordsProcessed() + { + return recordsProcessed; + } + public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( Map taskReports ) @@ -124,7 +136,8 @@ public class IngestionStatsAndErrorsTaskReportData Objects.equals(getRowStats(), that.getRowStats()) && Objects.equals(getErrorMsg(), that.getErrorMsg()) && Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) && - Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()); + Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) && + Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()); } @Override @@ -136,7 +149,8 @@ public class IngestionStatsAndErrorsTaskReportData getRowStats(), getErrorMsg(), isSegmentAvailabilityConfirmed(), - getSegmentAvailabilityWaitTimeMs() + getSegmentAvailabilityWaitTimeMs(), + getRecordsProcessed() ); } @@ -150,6 +164,7 @@ public class IngestionStatsAndErrorsTaskReportData ", errorMsg='" + errorMsg + '\'' + ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed + ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs + + ", recordsProcessed=" + recordsProcessed + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 9cee79b6308..9ea94b844cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -620,7 +620,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements getTaskCompletionRowStats(), errorMsg, errorMsg == null, - 0L + 0L, + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 37bbf647e74..1422720bf94 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -693,7 +693,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler getTaskCompletionRowStats(), errorMsg, segmentAvailabilityConfirmationCompleted, - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index e90f531b58d..a0d7f4143f7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -579,7 +579,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler getTaskCompletionRowStats(), errorMsg, segmentAvailabilityConfirmationCompleted, - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index df987798aa7..d0afc075617 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1240,7 +1240,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen rowStatsAndUnparseableEvents.lhs, taskStatus.getErrorMsg(), segmentAvailabilityConfirmed, - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index e20c7bdbe35..9b1a90b5791 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -247,7 +247,8 @@ abstract class PartialSegmentGenerateTask e getTaskCompletionRowStats(), "", false, // not applicable for parallel subtask - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index a3bd47d2960..8d9bd8f7b6d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -643,7 +643,8 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand getTaskCompletionRowStats(), errorMsg, false, // not applicable for parallel subtask - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 9fc743bd23b..c7f77cea566 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -243,6 +243,8 @@ public abstract class SeekableStreamIndexTaskRunner> sequences; private volatile Throwable backgroundThreadException; + private final Map partitionsThroughput = new HashMap<>(); + public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @Nullable final InputRowParser parser, @@ -683,6 +685,7 @@ public abstract class SeekableStreamIndexTaskRunner getPartitionStats() + { + return CollectionUtils.mapKeys(partitionsThroughput, key -> key.toString()); + } + private Map getTaskCompletionUnparseableEvents() { Map unparseableEventsMap = new HashMap<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 4231f318efd..13a850f5ee7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -69,7 +69,8 @@ public class TaskReportSerdeTest ), "an error message", true, - 1000L + 1000L, + ImmutableMap.of("PartitionA", 5000L) ) ); String report1serialized = jsonMapper.writeValueAsString(report1);