Enable partition stats on streaming task completion report (#15930)

Changes:
- Add visibility into number of records processed by each streaming task per partition
- Add field `recordsProcessed` to `IngestionStatsAndErrorsTaskReportData`
- Populate number of records processed per partition in `SeekableStreamIndexTaskRunner`
This commit is contained in:
Adithya Chakilam 2024-02-23 04:59:03 -06:00 committed by GitHub
parent 3011829419
commit 1f443d218c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 132 additions and 11 deletions

View File

@ -83,6 +83,9 @@ An example output is shown below:
},
"segmentAvailabilityConfirmed": false,
"segmentAvailabilityWaitTimeMs": 0,
"recordsProcessed": {
"partition-a": 5789
},
"errorMsg": null
},
"type": "ingestionStatsAndErrors"
@ -98,6 +101,7 @@ For some task types, the indexing task can wait for the newly ingested segments
|---|---|
|`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.|
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|
|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.|
### Live report

View File

@ -3200,6 +3200,92 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages());
}
@Test(timeout = 60_000L)
public void testCompletionReportPartitionStats() throws Exception
{
insertData();
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(
topic,
ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
topic,
ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 10L)
),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT,
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
TaskStatus status = future.get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L);
}
@Test(timeout = 60_000L)
public void testCompletionReportMultiplePartitionStats() throws Exception
{
insertData();
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(
topic,
ImmutableMap.of(
new KafkaTopicPartition(false, topic, 0),
0L,
new KafkaTopicPartition(false, topic, 1),
0L
),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
topic,
ImmutableMap.of(
new KafkaTopicPartition(false, topic, 0),
10L,
new KafkaTopicPartition(false, topic, 1),
2L
)
),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT,
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
TaskStatus status = future.get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L)));
}
public static class TestKafkaInputFormat implements InputFormat
{
final InputFormat baseInputFormat;

View File

@ -47,13 +47,17 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty
private long segmentAvailabilityWaitTimeMs;
@JsonProperty
private Map<String, Long> recordsProcessed;
public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs,
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed
)
{
this.ingestionState = ingestionState;
@ -62,6 +66,7 @@ public class IngestionStatsAndErrorsTaskReportData
this.errorMsg = errorMsg;
this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
this.recordsProcessed = recordsProcessed;
}
@JsonProperty
@ -101,6 +106,13 @@ public class IngestionStatsAndErrorsTaskReportData
return segmentAvailabilityWaitTimeMs;
}
@JsonProperty
@Nullable
public Map<String, Long> getRecordsProcessed()
{
return recordsProcessed;
}
public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
@ -124,7 +136,8 @@ public class IngestionStatsAndErrorsTaskReportData
Objects.equals(getRowStats(), that.getRowStats()) &&
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) &&
Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs());
Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) &&
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed());
}
@Override
@ -136,7 +149,8 @@ public class IngestionStatsAndErrorsTaskReportData
getRowStats(),
getErrorMsg(),
isSegmentAvailabilityConfirmed(),
getSegmentAvailabilityWaitTimeMs()
getSegmentAvailabilityWaitTimeMs(),
getRecordsProcessed()
);
}
@ -150,6 +164,7 @@ public class IngestionStatsAndErrorsTaskReportData
", errorMsg='" + errorMsg + '\'' +
", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
", recordsProcessed=" + recordsProcessed +
'}';
}
}

View File

@ -620,7 +620,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
getTaskCompletionRowStats(),
errorMsg,
errorMsg == null,
0L
0L,
Collections.emptyMap()
)
)
);

View File

@ -693,7 +693,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
getTaskCompletionRowStats(),
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
)
)
);

View File

@ -579,7 +579,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
getTaskCompletionRowStats(),
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
)
)
);

View File

@ -1240,7 +1240,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
rowStatsAndUnparseableEvents.lhs,
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
)
)
);

View File

@ -247,7 +247,8 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
getTaskCompletionRowStats(),
"",
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
)
)
);

View File

@ -643,7 +643,8 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
getTaskCompletionRowStats(),
errorMsg,
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
)
)
);

View File

@ -243,6 +243,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequences;
private volatile Throwable backgroundThreadException;
private final Map<PartitionIdType, Long> partitionsThroughput = new HashMap<>();
public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
@Nullable final InputRowParser<ByteBuffer> parser,
@ -683,6 +685,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
sequenceToCheckpoint = sequenceToUse;
}
isPersistRequired |= addResult.isPersistRequired();
partitionsThroughput.merge(record.getPartitionId(), 1L, Long::sum);
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
@ -1126,12 +1129,18 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
getTaskCompletionRowStats(),
errorMsg,
errorMsg == null,
handoffWaitMs
handoffWaitMs,
getPartitionStats()
)
)
);
}
private Map<String, Long> getPartitionStats()
{
return CollectionUtils.mapKeys(partitionsThroughput, key -> key.toString());
}
private Map<String, Object> getTaskCompletionUnparseableEvents()
{
Map<String, Object> unparseableEventsMap = new HashMap<>();

View File

@ -69,7 +69,8 @@ public class TaskReportSerdeTest
),
"an error message",
true,
1000L
1000L,
ImmutableMap.of("PartitionA", 5000L)
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);