Add stats segmentsRead and segmentsPublished to compaction task reports (#15947)

Changes:
- Add visibility into number of segments read/published by each parallel compaction
- Add new fields `segmentsRead`, `segmentsPublished` to `IngestionStatsAndErrorsTaskReportData`
- Update `ParallelIndexSupervisorTask` to populate the new stats
This commit is contained in:
Adithya Chakilam 2024-03-06 22:07:23 -06:00 committed by GitHub
parent ebf3bdd909
commit 564c44ed85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 193 additions and 14 deletions

View File

@ -155,6 +155,14 @@ For some task types, the indexing task can wait for the newly ingested segments
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|
|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.|
#### Compaction task segment info fields
|Field|Description|
|---|---|
|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.|
|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.|
### Live report
When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at:

View File

@ -57,7 +57,7 @@ public class IngestionStatsAndErrorsTaskReport implements TaskReport
}
@Override
public Object getPayload()
public IngestionStatsAndErrorsTaskReportData getPayload()
{
return payload;
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.IngestionState;
@ -50,6 +51,11 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty
private Map<String, Long> recordsProcessed;
@JsonProperty
private Long segmentsRead;
@JsonProperty
private Long segmentsPublished;
public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@ -57,7 +63,9 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs,
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed,
@Nullable @JsonProperty("segmentsRead") Long segmentsRead,
@Nullable @JsonProperty("segmentsPublished") Long segmentsPublished
)
{
this.ingestionState = ingestionState;
@ -67,6 +75,8 @@ public class IngestionStatsAndErrorsTaskReportData
this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
this.recordsProcessed = recordsProcessed;
this.segmentsRead = segmentsRead;
this.segmentsPublished = segmentsPublished;
}
@JsonProperty
@ -113,6 +123,22 @@ public class IngestionStatsAndErrorsTaskReportData
return recordsProcessed;
}
@JsonProperty
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long getSegmentsRead()
{
return segmentsRead;
}
@JsonProperty
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long getSegmentsPublished()
{
return segmentsPublished;
}
public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
@ -137,7 +163,9 @@ public class IngestionStatsAndErrorsTaskReportData
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) &&
Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) &&
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed());
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) &&
Objects.equals(getSegmentsRead(), that.getSegmentsRead()) &&
Objects.equals(getSegmentsPublished(), that.getSegmentsPublished());
}
@Override
@ -150,7 +178,9 @@ public class IngestionStatsAndErrorsTaskReportData
getErrorMsg(),
isSegmentAvailabilityConfirmed(),
getSegmentAvailabilityWaitTimeMs(),
getRecordsProcessed()
getRecordsProcessed(),
getSegmentsRead(),
getSegmentsPublished()
);
}
@ -165,6 +195,8 @@ public class IngestionStatsAndErrorsTaskReportData
", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
", recordsProcessed=" + recordsProcessed +
", segmentsRead=" + segmentsRead +
", segmentsPublished=" + segmentsPublished +
'}';
}
}

View File

@ -621,7 +621,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
errorMsg,
errorMsg == null,
0L,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);

View File

@ -694,7 +694,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);

View File

@ -602,7 +602,9 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);

View File

@ -203,6 +203,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
private Long segmentsRead;
private Long segmentsPublished;
private final boolean isCompactionTask;
@ -643,6 +645,14 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
if (isCompactionTask) {
// Populate segmentsRead only for compaction tasks
segmentsRead = parallelSinglePhaseRunner.getReports()
.values()
.stream()
.mapToLong(report -> report.getOldSegments().size()).sum();
}
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
@ -1189,6 +1199,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
} else {
throw new ISE("Failed to publish segments");
}
segmentsPublished = (long) newSegments.size();
}
private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
@ -1245,7 +1257,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
segmentsRead,
segmentsPublished
)
)
);
@ -1629,6 +1643,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
@ -1639,6 +1654,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport)
taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
).getPayload().getSegmentsRead();
if (segmentsReadFromPartition != null) {
totalSegmentsRead += segmentsReadFromPartition;
}
}
RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
@ -1647,6 +1669,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
if (totalSegmentsRead > 0) {
segmentsRead = totalSegmentsRead;
}
return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}

View File

@ -37,6 +37,8 @@ import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@ -125,7 +127,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
toolbox.getIndexingTmpDir()
);
Map<String, TaskReport> taskReport = getTaskCompletionReports();
Map<String, TaskReport> taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource));
taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport));
@ -149,6 +151,18 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
Map<String, TaskReport> taskReport
);
private Long getNumSegmentsRead(InputSource inputSource)
{
if (inputSource instanceof DruidInputSource) {
List<WindowedSegmentId> segments = ((DruidInputSource) inputSource).getSegmentIds();
if (segments != null) {
return (long) segments.size();
}
}
return null;
}
private List<DataSegment> generateSegments(
final TaskToolbox toolbox,
final ParallelIndexSupervisorTaskClient taskClient,
@ -236,7 +250,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
private Map<String, TaskReport> getTaskCompletionReports()
private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
@ -248,7 +262,9 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
"",
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
segmentsRead,
null
)
)
);

View File

@ -644,7 +644,9 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
errorMsg,
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);

View File

@ -1130,7 +1130,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
errorMsg,
errorMsg == null,
handoffWaitMs,
getPartitionStats()
getPartitionStats(),
null,
null
)
)
);

View File

@ -250,6 +250,18 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
List<IngestionStatsAndErrorsTaskReportData> reports = getIngestionReports();
Assert.assertEquals(reports.size(), 3); // since three index tasks are run by single compaction task
// this test reads 3 segments and publishes 6 segments
Assert.assertEquals(
3,
reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum()
);
Assert.assertEquals(
6,
reports.stream()
.mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished)
.sum()
);
}
@Test

View File

@ -70,7 +70,9 @@ public class TaskReportSerdeTest
"an error message",
true,
1000L,
ImmutableMap.of("PartitionA", 5000L)
ImmutableMap.of("PartitionA", 5000L),
5L,
10L
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);
@ -127,6 +129,8 @@ public class TaskReportSerdeTest
"an error message",
true,
1000L,
null,
null,
null
)
);

View File

@ -22,6 +22,7 @@ package org.apache.druid.tests.indexer;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;
@ -65,6 +66,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static final String SEGMENT_METADATA_QUERY_RESOURCE = "/indexer/segment_metadata_query.json";
private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
private static final String PARALLEL_COMPACTION_TASK = "/indexer/wikipedia_compaction_task_parallel.json";
private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json";
private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_compaction_task_with_granularity_spec.json";
@ -138,6 +140,54 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
}
}
@Test
public void testParallelHashedCompaction() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK, fullDatasourceName);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
);
queryHelper.testQueriesFromString(queryResponseTemplate);
String taskId = compactData(PARALLEL_COMPACTION_TASK, null, null);
// The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);
Map<String, IngestionStatsAndErrorsTaskReport> reports = indexer.getTaskReport(taskId);
Assert.assertTrue(reports != null && reports.size() > 0);
Assert.assertEquals(2,
reports.values()
.stream()
.mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished())
.sum()
);
Assert.assertEquals(4,
reports.values()
.stream()
.mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead())
.sum()
);
}
}
@Test
public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception
{

View File

@ -0,0 +1,22 @@
{
"type" : "compact",
"dataSource" : "%%DATASOURCE%%",
"ioConfig" : {
"type": "compact",
"inputSpec": {
"type": "interval",
"interval": "2013-08-31/2013-09-02"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "hashed"
},
"forceGuaranteedRollup": true,
"maxNumConcurrentSubTasks": 3
},
"context" : {
"storeCompactionState" : true
}
}