Allow users to add additional metadata to ingestion metrics (#13760)

* Allow users to add additional metadata to ingestion metrics

When submitting an ingestion spec, users may pass a map of metadata
in the ingestion spec config that will be added to ingestion metrics.

This will make it possible for operators to tag metrics with other
metadata that doesn't necessarily line up with the existing tags
like taskId.

Druid clusters that ingest these metrics can take advantage of the
nested data columns feature to process this additional metadata.

* rename to tags

* docs

* tests

* fix test

* make code cov happy

* checkstyle
This commit is contained in:
Suneet Saldanha 2023-02-08 18:07:23 -08:00 committed by GitHub
parent d7a15be9bc
commit 714ac07b52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 466 additions and 109 deletions

View File

@ -149,6 +149,14 @@ public class ServiceMetricEvent implements Event
return this;
}
public Builder setDimensionIfNotNull(String dim, Object value)
{
if (value != null) {
userDims.put(dim, value);
}
return this;
}
public Builder setDimension(String dim, Object value)
{
userDims.put(dim, value);

View File

@ -27,8 +27,10 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
/**
*
*/
public class ServiceMetricEventTest
{
@ -291,4 +293,25 @@ public class ServiceMetricEventTest
ServiceMetricEvent.builder().build("foo", 0 / 0f);
}
@Test
public void testSetDimensionIfNotNullSetsNonNullDimension()
{
Map<String, String> userDimMap = ImmutableMap.of("k1", "v1", "k2", "v2");
ServiceMetricEvent target = ServiceMetricEvent.builder()
.setDimensionIfNotNull("userDimMap", userDimMap)
.build("foo", 1)
.build("service", "host");
Assert.assertEquals(userDimMap, target.getUserDims().get("userDimMap"));
}
@Test
public void testSetDimensionIfNotNullShouldNotSetNullDimension()
{
ServiceMetricEvent target = ServiceMetricEvent.builder()
.setDimensionIfNotNull("userDimMap", null)
.build("foo", 1)
.build("service", "host");
Assert.assertTrue(target.getUserDims().isEmpty());
Assert.assertNull(target.getUserDims().get("userDimMap"));
}
}

View File

@ -152,11 +152,11 @@ If SQL is enabled, the Broker will emit the following metrics for SQL.
## General native ingestion metrics
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. |`dataSource`, `taskId`, `taskType`, `taskIngestionMode`|Always `1`.|
|`ingest/segments/count`|Count of final segments created by job (includes tombstones). |`dataSource`, `taskId`, `taskType`, `taskIngestionMode`|At least `1`.|
|`ingest/tombstones/count`|Count of tombstones created by job. |`dataSource`, `taskId`, `taskType`, `taskIngestionMode`|Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).|
|Metric|Description| Dimensions |Normal Value|
|------|-----------|---------------------------------------------------------|------------|
|`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Always `1`.|
|`ingest/segments/count`|Count of final segments created by job (includes tombstones). | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |At least `1`.|
|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).|
The `taskIngestionMode` dimension includes the following modes:
* `APPEND`: a native ingestion job appending to existing segments
@ -167,12 +167,15 @@ The mode is decided using the values
of the `isAppendToExisting` and `isDropExisting` flags in the
task's `IOConfig` as follows:
|`isAppendToExisting` | `isDropExisting` | mode |
|---------------------|-------------------|------|
`true` | `false` | `APPEND`|
`true` | `true ` | Invalid combination, exception thrown. |
`false` | `false` | `REPLACE_LEGACY` (this is the default for native batch ingestion). |
`false` | `true` | `REPLACE`|
| `isAppendToExisting` | `isDropExisting` | mode |
|----------------------|-------------------|------|
| `true` | `false` | `APPEND`|
| `true` | `true ` | Invalid combination, exception thrown. |
| `false` | `false` | `REPLACE_LEGACY` (this is the default for native batch ingestion). |
| `false` | `true` | `REPLACE`|
The `tags` dimension is reported only for metrics emitted from ingestion tasks whose ingest spec specifies the `tags`
field in the `context` field of the ingestion spec. `tags` is expected to be a map of string to object.
### Ingestion metrics for Kafka
@ -180,10 +183,10 @@ These metrics apply to the [Kafka indexing service](../development/extensions-co
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, should not be a very high number. |
|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, should not be a very high number. |
|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, should not be a very high number. |
|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`|Greater than 0, should not be a very high number. |
|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, should not be a very high number. |
### Ingestion metrics for Kinesis
@ -191,10 +194,10 @@ These metrics apply to the [Kinesis indexing service](../development/extensions-
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
### Other ingestion metrics
@ -203,26 +206,26 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`|0|
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`|0|
|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`|Equal to the number of events per emission period.|
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `tags`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `tags`|0|
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `tags`|0|
|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `tags`|Equal to the number of events per emission period.|
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`|Your number of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`|0 or very low|
|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`|0|
|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`|0|
|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. |`dataSource`, `taskId`, `taskType`|Greater than 0, depends on the time carried in event. |
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`| < 10 seconds|
|`ingest/handoff/time`|Total time taken for each set of segments handed off.|`dataSource`, `taskId`, `taskType`|Depends on coordinator cycle time.|
|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`, `tags`|0 or very low|
|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0|
|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0|
|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`, `tags`|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. |`dataSource`, `taskId`, `taskType`, `tags`|Greater than 0, depends on the time carried in event. |
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total time taken for each set of segments handed off.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on coordinator cycle time.|
Note: If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
@ -230,19 +233,20 @@ Note: If the JVM does not support CPU time measurement for the current thread, `
|Metric|Description| Dimensions |Normal Value|
|------|-----------|------------------------------------------------------------|------------|
|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`|Varies|
|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`|Varies|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a few seconds, based on action type.|
|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`, `tags`|Varies|
|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`, `tags`|Varies|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies from subsecond to a few seconds, based on action type.|
|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.|
|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.|
|`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.|
|`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.|
|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed`, `tags`|Varies|
|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.| `dataSource`|Varies|
|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|`dataSource`|Varies|
|`task/running/count`|Number of current running tasks. This metric is only available if the `TaskCountStatsMonitor` module is included.|`dataSource`|Varies|
@ -253,7 +257,6 @@ Note: If the JVM does not support CPU time measurement for the current thread, `
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed` |Varies|
|`worker/task/failed/count`|Number of failed tasks run on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for middleManager nodes.| `category`, `workerVersion`|Varies|
|`worker/task/success/count`|Number of successful tasks run on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for middleManager nodes.| `category`,`workerVersion`|Varies|
|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for middleManager nodes.| `category`, `workerVersion`|Varies|

View File

@ -45,7 +45,11 @@ public class TaskRealtimeMetricsMonitorBuilder
);
}
public static TaskRealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment, RowIngestionMeters meters)
public static TaskRealtimeMetricsMonitor build(
Task task,
FireDepartment fireDepartment,
RowIngestionMeters meters
)
{
return new TaskRealtimeMetricsMonitor(
fireDepartment,
@ -53,7 +57,8 @@ public class TaskRealtimeMetricsMonitorBuilder
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()}
)
),
task.getContextValue(DruidMetrics.TAGS)
);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import javax.annotation.Nullable;
import java.util.Map;
/**
@ -46,6 +47,8 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
private final FireDepartment fireDepartment;
private final RowIngestionMeters rowIngestionMeters;
private final Map<String, String[]> dimensions;
@Nullable
private final Map<String, Object> metricTags;
private FireDepartmentMetrics previousFireDepartmentMetrics;
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
@ -53,12 +56,14 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
public TaskRealtimeMetricsMonitor(
FireDepartment fireDepartment,
RowIngestionMeters rowIngestionMeters,
Map<String, String[]> dimensions
Map<String, String[]> dimensions,
@Nullable Map<String, Object> metricTags
)
{
this.fireDepartment = fireDepartment;
this.rowIngestionMeters = rowIngestionMeters;
this.dimensions = ImmutableMap.copyOf(dimensions);
this.metricTags = metricTags;
previousFireDepartmentMetrics = new FireDepartmentMetrics();
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
}
@ -80,6 +85,7 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
thrownAway
);
}
builder.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags);
emitter.emit(builder.build("ingest/events/thrownAway", thrownAway));
final long unparseable = rowIngestionMetersTotals.getUnparseable()

View File

@ -60,6 +60,7 @@ import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@ -691,6 +692,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
.setDimension("dataSource", getDataSource())
.setDimension("taskType", getType())
.setDimension("taskId", getId())
.setDimensionIfNotNull(DruidMetrics.TAGS, getContextValue(DruidMetrics.TAGS))
.setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted)
.build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs)
);

View File

@ -40,6 +40,7 @@ import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class IndexTaskUtils
{
@ -112,6 +113,10 @@ public class IndexTaskUtils
metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
metricBuilder.setDimensionIfNotNull(
DruidMetrics.TAGS,
task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
);
}
public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final AbstractTask task)
@ -119,7 +124,11 @@ public class IndexTaskUtils
metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE, ((AbstractTask) task).getIngestionMode());
metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE, task.getIngestionMode());
metricBuilder.setDimensionIfNotNull(
DruidMetrics.TAGS,
task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
);
}
public static void setTaskStatusDimensions(

View File

@ -143,7 +143,8 @@ import java.util.stream.Stream;
* @param <PartitionIdType> the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
* @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
*/
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> implements Supervisor
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
implements Supervisor
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
@ -433,27 +434,31 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
try {
long nowTime = System.currentTimeMillis();
if (spec.isSuspended()) {
log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended",
dataSource
log.info(
"Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended",
dataSource
);
return;
}
log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", pendingCompletionTaskGroups,
dataSource
dataSource
);
for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
if (!list.isEmpty()) {
log.info(
"Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]",
dataSource, pendingCompletionTaskGroups
"Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]",
dataSource,
pendingCompletionTaskGroups
);
return;
}
}
if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
nowTime - dynamicTriggerLastRunTime, autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
dataSource
);
return;
}
@ -479,18 +484,20 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* This method determines how to do scale actions based on collected lag points.
* If scale action is triggered :
* First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
* Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
* Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
* First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
* Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
* Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
*
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
* @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'.
* If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
* If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException, TimeoutException
private boolean changeTaskCount(int desiredActiveTaskCount)
throws InterruptedException, ExecutionException, TimeoutException
{
int currentActiveTaskCount;
Collection<TaskGroup> activeTaskGroups = activelyReadingTaskGroups.values();
@ -500,8 +507,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
} else {
log.info(
"Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].",
currentActiveTaskCount, desiredActiveTaskCount, dataSource
"Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].",
currentActiveTaskCount,
desiredActiveTaskCount,
dataSource
);
gracefulShutdownInternal();
changeTaskCountInIOConfig(desiredActiveTaskCount);
@ -796,14 +805,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("Running Task autoscaler for datasource [%s]", dataSource);
workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
: Math.min(10, autoScalerConfig.getTaskCountMax()));
? this.tuningConfig.getWorkerThreads()
: Math.min(10, autoScalerConfig.getTaskCountMax()));
maxNumTasks = autoScalerConfig.getTaskCountMax() * this.ioConfig.getReplicas();
} else {
workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
: Math.min(10, this.ioConfig.getTaskCount()));
? this.tuningConfig.getWorkerThreads()
: Math.min(10, this.ioConfig.getTaskCount()));
maxNumTasks = this.ioConfig.getTaskCount() * this.ioConfig.getReplicas();
}
@ -1246,7 +1255,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Collect row ingestion stats from all tasks managed by this supervisor.
*
* @return A map of groupId->taskId->task row stats
*
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
@ -1321,7 +1329,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Collect parse errors from all tasks managed by this supervisor.
*
* @return A list of parse error strings
*
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
@ -1975,12 +1982,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* Returns a Pair of information about a task:
*
* <p>
* Left-hand side: Status of the task from {@link SeekableStreamIndexTaskClient#getStatusAsync}.
*
* <p>
* Right-hand side: If status is {@link SeekableStreamIndexTaskRunner.Status#PUBLISHING}, end offsets from
* {@link SeekableStreamIndexTaskClient#getEndOffsetsAsync}. Otherwise, null.
*
* <p>
* Used by {@link #discoverTasks()}.
*/
private ListenableFuture<Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>>> getStatusAndPossiblyEndOffsets(
@ -2049,14 +2056,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("Resumed task [%s] in first supervisor run.", taskId);
} else {
log.warn("Failed to resume task [%s] in first supervisor run.", taskId);
killTask(taskId,
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.");
killTask(
taskId,
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change."
);
}
}
catch (Exception e) {
log.warn(e, "Failed to resume task [%s] in first supervisor run.", taskId);
killTask(taskId,
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.");
killTask(
taskId,
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change."
);
}
}
},
@ -2341,8 +2352,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Determines whether a given task was created by the current version of the supervisor.
* Uses the Task object mapped to this taskId in the {@code activeTaskMap}.
* If not found in the map, fetch it from the metadata store.
* @param taskGroupId task group id
* @param taskId task id
*
* @param taskGroupId task group id
* @param taskId task id
* @param activeTaskMap Set of active tasks that were pre-fetched
* @return true if the task was created by the current supervisor
*/
@ -2628,8 +2640,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
recordSupplierLock.lock();
try {
final Set<StreamPartition<PartitionIdType>> partitions = partitionIds.stream()
.map(partitionId -> new StreamPartition<>(ioConfig.getStream(), partitionId))
.collect(Collectors.toSet());
.map(partitionId -> new StreamPartition<>(
ioConfig.getStream(),
partitionId
))
.collect(Collectors.toSet());
if (!recordSupplier.getAssignment().containsAll(partitions)) {
recordSupplier.assign(partitions);
try {
@ -2740,7 +2755,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* by this method.
*
* @param availablePartitions
*
* @return a remapped copy of partitionGroups, containing only the partitions in availablePartitions
*/
protected Map<Integer, Set<PartitionIdType>> recomputePartitionGroupsForExpiration(
@ -2757,7 +2771,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
*
* @param currentMetadata The current DataSourceMetadata from metadata storage
* @param expiredPartitionIds The set of expired partition IDs.
*
* @return currentMetadata but with any expired partitions removed.
*/
protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(
@ -3396,7 +3409,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* should be removed from the starting offsets sent to the tasks.
*
* @param startingOffsets
*
* @return startingOffsets with entries for expired partitions removed
*/
protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> filterExpiredPartitionsFromStartingOffsets(
@ -3821,7 +3833,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
pendingCompletionTaskGroups
.values()
.stream()
.flatMap(taskGroups -> taskGroups.stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
.flatMap(taskGroups -> taskGroups.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
).collect(Collectors.toMap(
Entry::getKey,
@ -3829,7 +3842,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
(v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
));
partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(partitionId, offsetsFromMetadataStorage.get(partitionId)));
partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(
partitionId,
offsetsFromMetadataStorage.get(partitionId)
));
return currentOffsets;
}
}
@ -3935,6 +3951,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* Get all active tasks from metadata storage
*
* @return map from taskId to Task
*/
private Map<String, Task> getActiveTaskMap()
@ -3969,7 +3986,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* the given replicas count
*
* @return list of specific kafka/kinesis index taksks
*
* @throws JsonProcessingException
*/
protected abstract List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType>> createIndexTasks(
@ -3987,7 +4003,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* different between Kafka/Kinesis since Kinesis uses String as partition id
*
* @param partition partition id
*
* @return taskgroup id
*/
protected abstract int getTaskGroupIdForPartition(PartitionIdType partition);
@ -3997,7 +4012,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* of [kafka/kinesis]DataSourceMetadata
*
* @param metadata datasource metadata
*
* @return true if isInstance else false
*/
protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata metadata);
@ -4007,7 +4021,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* [Kafka/Kinesis]IndexTask
*
* @param task task
*
* @return true if isInstance else false
*/
protected abstract boolean doesTaskTypeMatchSupervisor(Task task);
@ -4017,7 +4030,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
*
* @param stream stream name
* @param map partitionId -> sequence
*
* @return specific instance of datasource metadata
*/
protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetaDataForReset(
@ -4152,9 +4164,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
try {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("noticeType", noticeType)
.setDimension("dataSource", dataSource)
.build("ingest/notices/time", timeInMillis)
.setDimension("noticeType", noticeType)
.setDimension("dataSource", dataSource)
.setDimensionIfNotNull(DruidMetrics.TAGS, spec.getContextValue(DruidMetrics.TAGS))
.build("ingest/notices/time", timeInMillis)
);
}
catch (Exception e) {
@ -4171,8 +4184,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
try {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.build("ingest/notices/queueSize", getNoticesQueueSize())
.setDimension("dataSource", dataSource)
.setDimensionIfNotNull(DruidMetrics.TAGS, spec.getContextValue(DruidMetrics.TAGS))
.build("ingest/notices/queueSize", getNoticesQueueSize())
);
}
catch (Exception e) {
@ -4207,12 +4221,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
LagStats lagStats = computeLags(partitionLags);
Map<String, Object> metricTags = spec.getContextValue(DruidMetrics.TAGS);
for (Map.Entry<PartitionIdType, Long> entry : partitionLags.entrySet()) {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.setDimension(DruidMetrics.PARTITION, entry.getKey())
.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
.build(
StringUtils.format("ingest/%s/partitionLag%s", type, suffix),
entry.getValue()
@ -4223,18 +4239,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
.build(StringUtils.format("ingest/%s/lag%s", type, suffix), lagStats.getTotalLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
.build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), lagStats.getMaxLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
.build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), lagStats.getAvgLag())
);
};
@ -4250,7 +4269,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* This method computes maxLag, totalLag and avgLag
* This method computes maxLag, totalLag and avgLag
*
* @param partitionLags lags per partition
*/
protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)

View File

@ -132,6 +132,12 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
return context;
}
@Nullable
public <ContextValueType> ContextValueType getContextValue(String key)
{
return context == null ? null : (ContextValueType) context.get(key);
}
public ServiceEmitter getEmitter()
{
return emitter;

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.FireDepartment;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(MockitoJUnitRunner.class)
public class TaskRealtimeMetricsMonitorTest
{
private static final Map<String, String[]> DIMENSIONS = ImmutableMap.of(
"dim1",
new String[]{"v1", "v2"},
"dim2",
new String[]{"vv"}
);
private static final Map<String, Object> TAGS = ImmutableMap.of("author", "Author Name", "version", 10);
@Mock(answer = Answers.RETURNS_MOCKS)
private FireDepartment fireDepartment;
@Mock(answer = Answers.RETURNS_MOCKS)
private RowIngestionMeters rowIngestionMeters;
@Mock
private ServiceEmitter emitter;
private Map<String, ServiceMetricEvent> emittedEvents;
private TaskRealtimeMetricsMonitor target;
@Before
public void setUp()
{
emittedEvents = new HashMap<>();
Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
Mockito
.doAnswer(invocation -> {
ServiceMetricEvent e = invocation.getArgument(0);
emittedEvents.put(e.getMetric(), e);
return null;
})
.when(emitter).emit(ArgumentMatchers.any(Event.class));
target = new TaskRealtimeMetricsMonitor(fireDepartment, rowIngestionMeters, DIMENSIONS, TAGS);
}
@Test
public void testdoMonitorShouldEmitUserProvidedTags()
{
target.doMonitor(emitter);
for (ServiceMetricEvent sme : emittedEvents.values()) {
Assert.assertEquals(TAGS, sme.getUserDims().get(DruidMetrics.TAGS));
}
}
@Test
public void testdoMonitorWithoutTagsShouldNotEmitTags()
{
target = new TaskRealtimeMetricsMonitor(fireDepartment, rowIngestionMeters, DIMENSIONS, null);
for (ServiceMetricEvent sme : emittedEvents.values()) {
Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Map;
@RunWith(MockitoJUnitRunner.class)
public class IndexTaskUtilsTest
{
private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);
@Mock
private Task task;
@Mock
private AbstractTask abstractTask;
private ServiceMetricEvent.Builder metricBuilder;
@Before
public void setUp()
{
metricBuilder = ServiceMetricEvent.builder();
Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
}
@Test
public void testSetTaskDimensionsWithContextTagsShouldSetTags()
{
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
Assert.assertEquals(METRIC_TAGS, metricBuilder.getDimension(DruidMetrics.TAGS));
}
@Test
public void testSetTaskDimensionsForAbstractTaskWithContextTagsShouldSetTags()
{
IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
Assert.assertEquals(METRIC_TAGS, metricBuilder.getDimension(DruidMetrics.TAGS));
}
@Test
public void testSetTaskDimensionsWithoutTagsShouldNotSetTags()
{
Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(null);
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
}
@Test
public void testSetTaskDimensionsForAbstractTaskWithoutTagsShouldNotSetTags()
{
Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(null);
IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
}
}

View File

@ -674,14 +674,15 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
EasyMock.replay(ingestionSchema);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
"1",
"enableTaskAutoScaler",
true,
"taskCountMax",
"4",
"taskCountMin",
"1"
.andReturn(mapper.convertValue(ImmutableMap.of(
"lagCollectionIntervalMillis",
"1",
"enableTaskAutoScaler",
true,
"taskCountMax",
"4",
"taskCountMin",
"1"
), AutoScalerConfig.class))
.anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
@ -931,7 +932,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
null,
null,
new IdleConfig(true, null)
){
)
{
};
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
@ -981,6 +983,83 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
}
@Test
public void testGetContextVauleWithNullContextShouldReturnNull()
{
mockIngestionSchema();
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
ingestionSchema,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1"
);
Assert.assertNull(spec.getContextValue("key"));
}
@Test
public void testGetContextVauleForNonExistentKeyShouldReturnNull()
{
mockIngestionSchema();
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
ingestionSchema,
ImmutableMap.of("key", "value"),
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1"
);
Assert.assertNull(spec.getContextValue("key_not_exists"));
}
@Test
public void testGetContextVauleForKeyShouldReturnValue()
{
mockIngestionSchema();
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
ingestionSchema,
ImmutableMap.of("key", "value"),
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1"
);
Assert.assertEquals("value", spec.getContextValue("key"));
}
private void mockIngestionSchema()
{
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
}
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();

View File

@ -68,6 +68,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@ -110,6 +111,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
private static final String SHARD_ID = "0";
private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID);
private static final String EXCEPTION_MSG = "I had an exception";
private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);
private TaskStorage taskStorage;
private TaskMaster taskMaster;
@ -151,6 +153,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
EasyMock.expect(taskClientFactory.build(
EasyMock.anyString(),
@ -792,16 +795,22 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
Assert.assertEquals(6, events.size());
Assert.assertEquals("ingest/test/lag", events.get(0).toMap().get("metric"));
Assert.assertEquals(850L, events.get(0).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("ingest/test/maxLag", events.get(1).toMap().get("metric"));
Assert.assertEquals(500L, events.get(1).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(1).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("ingest/test/avgLag", events.get(2).toMap().get("metric"));
Assert.assertEquals(283L, events.get(2).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(2).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("ingest/test/lag/time", events.get(3).toMap().get("metric"));
Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(3).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("ingest/test/maxLag/time", events.get(4).toMap().get("metric"));
Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(4).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("ingest/test/avgLag/time", events.get(5).toMap().get("metric"));
Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(5).toMap().get(DruidMetrics.TAGS));
verifyAll();
}
@ -872,10 +881,13 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
Assert.assertEquals(3, events.size());
Assert.assertEquals("ingest/test/lag/time", events.get(0).toMap().get("metric"));
Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("ingest/test/maxLag/time", events.get(1).toMap().get("metric"));
Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(1).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("ingest/test/avgLag/time", events.get(2).toMap().get("metric"));
Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(2).toMap().get(DruidMetrics.TAGS));
verifyAll();
}
@ -909,6 +921,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
Assert.assertEquals(1, events.size());
Assert.assertEquals("ingest/notices/queueSize", events.get(0).toMap().get("metric"));
Assert.assertEquals(0, events.get(0).toMap().get("value"));
Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
verifyAll();
}
@ -936,6 +949,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
events = filterMetrics(events, whitelist);
Assert.assertEquals(1, events.size());
Assert.assertEquals("ingest/notices/time", events.get(0).toMap().get("metric"));
Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
Assert.assertTrue(String.valueOf(events.get(0).toMap().get("value")), (long) events.get(0).toMap().get("value") > 0);
Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
Assert.assertEquals("run_notice", events.get(0).toMap().get("noticeType"));
@ -1063,6 +1077,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();

View File

@ -52,6 +52,8 @@ public class DruidMetrics
public static final String PARTITION = "partition";
public static final String TAGS = "tags";
public static int findNumComplexAggs(List<AggregatorFactory> aggs)
{
int retVal = 0;