Add groupId to metrics (#14402)

* Add group id as a dimension

* Revert changes

* Add to forking task runner

* Add missing metrics

* Fix indenting

* revert metrics

* Fix indentation
This commit is contained in:
George Shiqi Wu 2023-06-16 12:28:16 -04:00 committed by GitHub
parent 359bd63cc9
commit 64af9bfe5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 31 deletions

View File

@ -154,9 +154,9 @@ If SQL is enabled, the Broker will emit the following metrics for SQL.
|Metric|Description| Dimensions |Normal Value| |Metric|Description| Dimensions |Normal Value|
|------|-----------|---------------------------------------------------------|------------| |------|-----------|---------------------------------------------------------|------------|
|`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Always `1`.| |`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |Always `1`.|
|`ingest/segments/count`|Count of final segments created by job (includes tombstones). | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |At least `1`.| |`ingest/segments/count`|Count of final segments created by job (includes tombstones). | `dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |At least `1`.|
|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).| |`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).|
The `taskIngestionMode` dimension includes the following modes: The `taskIngestionMode` dimension includes the following modes:
* `APPEND`: a native ingestion job appending to existing segments * `APPEND`: a native ingestion job appending to existing segments
@ -206,26 +206,26 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|Metric|Description|Dimensions|Normal Value| |Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------| |------|-----------|----------|------------|
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `tags`|0| |`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `tags`|0| |`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `tags`|0| |`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `tags`|Equal to the number of events per emission period.| |`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`|Your number of events with rollup.| |`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`, `groupId`|Your number of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.| |`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| |`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| |`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`, `tags`|0 or very low| |`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0 or very low|
|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0| |`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0| |`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`, `groupId`,`tags`|0|
|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| |`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.| |`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.| |`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`, `tags`|1~3| |`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. If the value is increasing but lag is low, Druid may not be receiving new data. This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, `tags`|Greater than 0, depends on the time carried in event. | |`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. If the value is increasing but lag is low, Druid may not be receiving new data. This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in event. |
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. | |`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s | |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on coordinator cycle time.| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on coordinator cycle time.|
Note: If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. Note: If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
@ -233,20 +233,20 @@ Note: If the JVM does not support CPU time measurement for the current thread, `
|Metric|Description| Dimensions |Normal Value| |Metric|Description| Dimensions |Normal Value|
|------|-----------|------------------------------------------------------------|------------| |------|-----------|------------------------------------------------------------|------------|
|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`, `tags`|Varies| |`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `groupId`, `taskStatus`, `tags`|Varies|
|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`, `tags`|Varies| |`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|< 1000 (subsecond)| |`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies from subsecond to a few seconds, based on action type.| |`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies from subsecond to a few seconds, based on action type.|
|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies| |`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies| |`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.| |`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.|
|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.| |`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.|
|`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.| |`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.|
|`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.| |`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed`, `tags`|Varies| |`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `groupId`, `taskId`, `segmentAvailabilityConfirmed`, `tags`|Varies|
|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies| |`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies| |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies| |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `groupId`, `interval`, `tags`|Varies|
|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.| `dataSource`|Varies| |`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.| `dataSource`|Varies|
|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|`dataSource`|Varies| |`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|`dataSource`|Varies|
|`task/running/count`|Number of current running tasks. This metric is only available if the `TaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`task/running/count`|Number of current running tasks. This metric is only available if the `TaskCountStatsMonitor` module is included.|`dataSource`|Varies|

View File

@ -56,7 +56,8 @@ public class TaskRealtimeMetricsMonitorBuilder
meters, meters,
ImmutableMap.of( ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()}, DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()} DruidMetrics.TASK_TYPE, new String[]{task.getType()},
DruidMetrics.GROUP_ID, new String[]{task.getGroupId()}
), ),
task.getContextValue(DruidMetrics.TAGS) task.getContextValue(DruidMetrics.TAGS)
); );

View File

@ -650,6 +650,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
.setDimension("dataSource", getDataSource()) .setDimension("dataSource", getDataSource())
.setDimension("taskType", getType()) .setDimension("taskType", getType())
.setDimension("taskId", getId()) .setDimension("taskId", getId())
.setDimension("groupId", getGroupId())
.setDimensionIfNotNull(DruidMetrics.TAGS, getContextValue(DruidMetrics.TAGS)) .setDimensionIfNotNull(DruidMetrics.TAGS, getContextValue(DruidMetrics.TAGS))
.setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted) .setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted)
.build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs) .build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs)

View File

@ -117,6 +117,7 @@ public class IndexTaskUtils
DruidMetrics.TAGS, DruidMetrics.TAGS,
task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS) task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
); );
metricBuilder.setDimensionIfNotNull(DruidMetrics.GROUP_ID, task.getGroupId());
} }
public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final AbstractTask task) public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final AbstractTask task)
@ -129,6 +130,7 @@ public class IndexTaskUtils
DruidMetrics.TAGS, DruidMetrics.TAGS,
task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS) task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
); );
metricBuilder.setDimensionIfNotNull(DruidMetrics.GROUP_ID, task.getGroupId());
} }
public static void setTaskStatusDimensions( public static void setTaskStatusDimensions(

View File

@ -314,6 +314,10 @@ public class ForkingTaskRunner
MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE, MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE,
task.getType() task.getType()
); );
command.addSystemProperty(
MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.GROUP_ID,
task.getGroupId()
);
command.addSystemProperty("druid.host", childHost); command.addSystemProperty("druid.host", childHost);

View File

@ -36,6 +36,8 @@ import java.util.Map;
public class IndexTaskUtilsTest public class IndexTaskUtilsTest
{ {
private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20); private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);
private static final String GROUP_ID = "groupId123";
@Mock @Mock
private Task task; private Task task;
@Mock @Mock
@ -47,7 +49,9 @@ public class IndexTaskUtilsTest
{ {
metricBuilder = ServiceMetricEvent.builder(); metricBuilder = ServiceMetricEvent.builder();
Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS); Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
Mockito.when(task.getGroupId()).thenReturn(GROUP_ID);
Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS); Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
Mockito.when(abstractTask.getGroupId()).thenReturn(GROUP_ID);
} }
@Test @Test
@ -79,4 +83,34 @@ public class IndexTaskUtilsTest
IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask); IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS)); Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
} }
@Test
public void testSetTaskDimensionsWithGroupIdShouldSetGroupId()
{
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
Assert.assertEquals(GROUP_ID, metricBuilder.getDimension(DruidMetrics.GROUP_ID));
}
@Test
public void testSetTaskDimensionsWithoutGroupIdShouldNotSetGroupId()
{
Mockito.when(task.getGroupId()).thenReturn(null);
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
Assert.assertNull(metricBuilder.getDimension(DruidMetrics.GROUP_ID));
}
@Test
public void testSetTaskDimensionsForAbstractTaskWithGroupIdShouldSetGroupId()
{
IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
Assert.assertEquals(GROUP_ID, metricBuilder.getDimension(DruidMetrics.GROUP_ID));
}
@Test
public void testSetTaskDimensionsForAbstractTaskWithoutGroupIdShouldNotSetGroupId()
{
Mockito.when(abstractTask.getGroupId()).thenReturn(null);
IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
Assert.assertNull(metricBuilder.getDimension(DruidMetrics.GROUP_ID));
}
} }

View File

@ -33,6 +33,7 @@ public class DruidMetrics
public static final String INTERVAL = "interval"; public static final String INTERVAL = "interval";
public static final String ID = "id"; public static final String ID = "id";
public static final String TASK_ID = "taskId"; public static final String TASK_ID = "taskId";
public static final String GROUP_ID = "groupId";
public static final String STATUS = "status"; public static final String STATUS = "status";
public static final String TASK_INGESTION_MODE = "taskIngestionMode"; public static final String TASK_INGESTION_MODE = "taskIngestionMode";