From 64af9bfe5bfcbe81e865c7c2bb3b898297b1d76c Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 16 Jun 2023 12:28:16 -0400 Subject: [PATCH] Add groupId to metrics (#14402) * Add group id as a dimension * Revert changes * Add to forking task runner * Add missing metrics * Fix indenting * revert metrics * Fix indentation --- docs/operations/metrics.md | 60 +++++++++---------- .../TaskRealtimeMetricsMonitorBuilder.java | 3 +- .../common/task/AbstractBatchIndexTask.java | 1 + .../indexing/common/task/IndexTaskUtils.java | 2 + .../indexing/overlord/ForkingTaskRunner.java | 4 ++ .../common/task/IndexTaskUtilsTest.java | 34 +++++++++++ .../org/apache/druid/query/DruidMetrics.java | 1 + 7 files changed, 74 insertions(+), 31 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index cb786abb823..16aaca90837 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -154,9 +154,9 @@ If SQL is enabled, the Broker will emit the following metrics for SQL. |Metric|Description| Dimensions |Normal Value| |------|-----------|---------------------------------------------------------|------------| -|`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Always `1`.| -|`ingest/segments/count`|Count of final segments created by job (includes tombstones). | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |At least `1`.| -|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).| +|`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |Always `1`.| +|`ingest/segments/count`|Count of final segments created by job (includes tombstones). | `dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |At least `1`.| +|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).| The `taskIngestionMode` dimension includes the following modes: * `APPEND`: a native ingestion job appending to existing segments @@ -206,26 +206,26 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |Metric|Description|Dimensions|Normal Value| |------|-----------|----------|------------| -|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `tags`|0| -|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `tags`|0| -|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `tags`|0| -|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `tags`|Equal to the number of events per emission period.| -|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`|Your number of events with rollup.| -|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.| -|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| -|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| -|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`, `tags`|0 or very low| -|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0| -|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0| -|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| -|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| -|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.| -|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`, `tags`|1~3| -|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. If the value is increasing but lag is low, Druid may not be receiving new data. This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, `tags`|Greater than 0, depends on the time carried in event. | +|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| +|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| +|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| +|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.| +|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`, `groupId`|Your number of events with rollup.| +|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration.| +|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.| +|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.| +|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0 or very low| +|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| +|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`, `groupId`,`tags`|0| +|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.| +|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.| +|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.| +|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|1~3| +|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. If the value is increasing but lag is low, Druid may not be receiving new data. This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in event. | |`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. | |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s | |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| -|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on coordinator cycle time.| +|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on coordinator cycle time.| Note: If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. @@ -233,20 +233,20 @@ Note: If the JVM does not support CPU time measurement for the current thread, ` |Metric|Description| Dimensions |Normal Value| |------|-----------|------------------------------------------------------------|------------| -|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`, `tags`|Varies| -|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`, `tags`|Varies| -|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|< 1000 (subsecond)| -|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies from subsecond to a few seconds, based on action type.| -|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies| -|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies| +|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `groupId`, `taskStatus`, `tags`|Varies| +|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies| +|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|< 1000 (subsecond)| +|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies from subsecond to a few seconds, based on action type.| +|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies| +|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies| |`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.| |`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.| |`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.| |`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.| -|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed`, `tags`|Varies| -|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies| -|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies| -|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies| +|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `groupId`, `taskId`, `segmentAvailabilityConfirmed`, `tags`|Varies| +|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies| +|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies| +|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies| |`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.| `dataSource`|Varies| |`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|`dataSource`|Varies| |`task/running/count`|Number of current running tasks. This metric is only available if the `TaskCountStatsMonitor` module is included.|`dataSource`|Varies| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java index a07ad4eaad9..abd95169073 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java @@ -56,7 +56,8 @@ public class TaskRealtimeMetricsMonitorBuilder meters, ImmutableMap.of( DruidMetrics.TASK_ID, new String[]{task.getId()}, - DruidMetrics.TASK_TYPE, new String[]{task.getType()} + DruidMetrics.TASK_TYPE, new String[]{task.getType()}, + DruidMetrics.GROUP_ID, new String[]{task.getGroupId()} ), task.getContextValue(DruidMetrics.TAGS) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index a4cd183c01b..11bcfb6de78 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -650,6 +650,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask .setDimension("dataSource", getDataSource()) .setDimension("taskType", getType()) .setDimension("taskId", getId()) + .setDimension("groupId", getGroupId()) .setDimensionIfNotNull(DruidMetrics.TAGS, getContextValue(DruidMetrics.TAGS)) .setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted) .build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java index 05be8c19410..cd7a52f7728 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java @@ -117,6 +117,7 @@ public class IndexTaskUtils DruidMetrics.TAGS, task.>getContextValue(DruidMetrics.TAGS) ); + metricBuilder.setDimensionIfNotNull(DruidMetrics.GROUP_ID, task.getGroupId()); } public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final AbstractTask task) @@ -129,6 +130,7 @@ public class IndexTaskUtils DruidMetrics.TAGS, task.>getContextValue(DruidMetrics.TAGS) ); + metricBuilder.setDimensionIfNotNull(DruidMetrics.GROUP_ID, task.getGroupId()); } public static void setTaskStatusDimensions( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 8df3a69eee2..c2f2c0363ed 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -314,6 +314,10 @@ public class ForkingTaskRunner MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE, task.getType() ); + command.addSystemProperty( + MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.GROUP_ID, + task.getGroupId() + ); command.addSystemProperty("druid.host", childHost); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java index 8543f893fd9..025b3ee09ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java @@ -36,6 +36,8 @@ import java.util.Map; public class IndexTaskUtilsTest { private static final Map METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20); + + private static final String GROUP_ID = "groupId123"; @Mock private Task task; @Mock @@ -47,7 +49,9 @@ public class IndexTaskUtilsTest { metricBuilder = ServiceMetricEvent.builder(); Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS); + Mockito.when(task.getGroupId()).thenReturn(GROUP_ID); Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS); + Mockito.when(abstractTask.getGroupId()).thenReturn(GROUP_ID); } @Test @@ -79,4 +83,34 @@ public class IndexTaskUtilsTest IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask); Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS)); } + + @Test + public void testSetTaskDimensionsWithGroupIdShouldSetGroupId() + { + IndexTaskUtils.setTaskDimensions(metricBuilder, task); + Assert.assertEquals(GROUP_ID, metricBuilder.getDimension(DruidMetrics.GROUP_ID)); + } + + @Test + public void testSetTaskDimensionsWithoutGroupIdShouldNotSetGroupId() + { + Mockito.when(task.getGroupId()).thenReturn(null); + IndexTaskUtils.setTaskDimensions(metricBuilder, task); + Assert.assertNull(metricBuilder.getDimension(DruidMetrics.GROUP_ID)); + } + + @Test + public void testSetTaskDimensionsForAbstractTaskWithGroupIdShouldSetGroupId() + { + IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask); + Assert.assertEquals(GROUP_ID, metricBuilder.getDimension(DruidMetrics.GROUP_ID)); + } + + @Test + public void testSetTaskDimensionsForAbstractTaskWithoutGroupIdShouldNotSetGroupId() + { + Mockito.when(abstractTask.getGroupId()).thenReturn(null); + IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask); + Assert.assertNull(metricBuilder.getDimension(DruidMetrics.GROUP_ID)); + } } diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index 50482ce7f44..5caf90d3fda 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -33,6 +33,7 @@ public class DruidMetrics public static final String INTERVAL = "interval"; public static final String ID = "id"; public static final String TASK_ID = "taskId"; + public static final String GROUP_ID = "groupId"; public static final String STATUS = "status"; public static final String TASK_INGESTION_MODE = "taskIngestionMode";