diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 1cb3d36ad75..46be219d878 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -27,8 +27,10 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import org.apache.druid.indexing.common.task.BatchAppenderators; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.InputSourceProcessor; @@ -40,7 +42,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskIn import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -48,7 +49,6 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.segment.realtime.FireDepartmentMetrics; -import org.apache.druid.segment.realtime.RealtimeMetricsMonitor; import org.apache.druid.segment.realtime.appenderator.Appenderator; import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.SegmentAllocator; @@ -179,9 +179,10 @@ abstract class PartialSegmentGenerateTask e final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters(); - RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor( - Collections.singletonList(fireDepartmentForMetrics), - Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()}) + TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build( + this, + fireDepartmentForMetrics, + buildSegmentsMeters ); toolbox.addMonitor(metricsMonitor); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index bbd3f2964b6..9e0c8d80c83 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -36,10 +36,12 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.BatchAppenderators; @@ -53,7 +55,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -63,7 +64,6 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.segment.realtime.FireDepartmentMetrics; -import org.apache.druid.segment.realtime.RealtimeMetricsMonitor; import org.apache.druid.segment.realtime.appenderator.Appenderator; import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver; @@ -373,9 +373,10 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null); final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); - RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor( - Collections.singletonList(fireDepartmentForMetrics), - Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()}) + TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build( + this, + fireDepartmentForMetrics, + rowIngestionMeters ); toolbox.addMonitor(metricsMonitor);