mirror of https://github.com/apache/druid.git
Skip generating task context reports for sub tasks (#17219)
* Skip task context for sub tasks * DRY a little + skip context for live report
This commit is contained in:
parent
65277b17a9
commit
e5d027ee1c
|
@ -930,7 +930,24 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
|||
null,
|
||||
null
|
||||
)
|
||||
),
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a map with the following keys and values:
|
||||
* {@link IngestionStatsAndErrorsTaskReport#REPORT_KEY} : {@link IngestionStatsAndErrorsTaskReport}.
|
||||
* {@link TaskContextReport#REPORT_KEY} : {@link TaskContextReport}.
|
||||
*/
|
||||
protected TaskReport.ReportMap buildIngestionStatsAndContextReport(
|
||||
IngestionState ingestionState,
|
||||
String errorMessage,
|
||||
Long segmentsRead,
|
||||
Long segmentsPublished
|
||||
)
|
||||
{
|
||||
return TaskReport.buildTaskReports(
|
||||
buildIngestionStatsTaskReport(ingestionState, errorMessage, segmentsRead, segmentsPublished),
|
||||
new TaskContextReport(getId(), getContext())
|
||||
);
|
||||
}
|
||||
|
@ -947,21 +964,33 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
|||
)
|
||||
{
|
||||
return TaskReport.buildTaskReports(
|
||||
new IngestionStatsAndErrorsTaskReport(
|
||||
getId(),
|
||||
new IngestionStatsAndErrors(
|
||||
ingestionState,
|
||||
getTaskCompletionUnparseableEvents(),
|
||||
getTaskCompletionRowStats(),
|
||||
errorMessage,
|
||||
segmentAvailabilityConfirmationCompleted,
|
||||
segmentAvailabilityWaitTimeMs,
|
||||
Collections.emptyMap(),
|
||||
segmentsRead,
|
||||
segmentsPublished
|
||||
)
|
||||
),
|
||||
new TaskContextReport(getId(), getContext())
|
||||
buildIngestionStatsTaskReport(ingestionState, errorMessage, segmentsRead, segmentsPublished)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create IngestionStatsAndErrorsTaskReport.
|
||||
*/
|
||||
private IngestionStatsAndErrorsTaskReport buildIngestionStatsTaskReport(
|
||||
IngestionState ingestionState,
|
||||
String errorMessage,
|
||||
Long segmentsRead,
|
||||
Long segmentsPublished
|
||||
)
|
||||
{
|
||||
return new IngestionStatsAndErrorsTaskReport(
|
||||
getId(),
|
||||
new IngestionStatsAndErrors(
|
||||
ingestionState,
|
||||
getTaskCompletionUnparseableEvents(),
|
||||
getTaskCompletionRowStats(),
|
||||
errorMessage,
|
||||
segmentAvailabilityConfirmationCompleted,
|
||||
segmentAvailabilityWaitTimeMs,
|
||||
Collections.emptyMap(),
|
||||
segmentsRead,
|
||||
segmentsPublished
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -677,7 +677,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
|||
|
||||
private TaskReport.ReportMap getTaskCompletionReports()
|
||||
{
|
||||
return buildIngestionStatsReport(ingestionState, errorMsg, null, null);
|
||||
return buildIngestionStatsAndContextReport(ingestionState, errorMsg, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -537,7 +537,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler, Pe
|
|||
|
||||
private void updateAndWriteCompletionReports(TaskToolbox toolbox, Long segmentsRead, Long segmentsPublished)
|
||||
{
|
||||
completionReports = buildIngestionStatsReport(ingestionState, errorMsg, segmentsRead, segmentsPublished);
|
||||
completionReports = buildIngestionStatsAndContextReport(ingestionState, errorMsg, segmentsRead, segmentsPublished);
|
||||
if (isStandAloneTask) {
|
||||
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
|
||||
}
|
||||
|
|
|
@ -1251,7 +1251,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
|
|||
*/
|
||||
private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
|
||||
{
|
||||
return buildIngestionStatsReport(
|
||||
return buildIngestionStatsAndContextReport(
|
||||
IngestionState.COMPLETED,
|
||||
taskStatus.getErrorMsg(),
|
||||
segmentsRead,
|
||||
|
|
Loading…
Reference in New Issue