mirror of https://github.com/apache/druid.git
prometheus metric exporter (#10412)
* prometheus-emitter * use existing jetty server to expose prometheus collection endpoint * unused variables * better variable names * removed unused dependencies * more metric definitions * reorganize * use prometheus HTTPServer instead of hooking into Jetty server * temporary empty help string * temporary non-empty help. fix incorrect dimension value in JSON (also updated statsd json) * added full help text. added metric conversion factor for timers that are not using seconds. Correct metric dimension name in documentation * added documentation for prometheus emitter * safety for invalid labelNames * fix travis checks * Unit test and better sanitization of metrics names and label values * add precondition to check namespace against regex * use precompiled regex * remove static imports. fix metric types * better docs. fix possible NPE in PrometheusEmitterConfig. Guard against multiple calls to PrometheusEmitter.start() * Update regex for label-value replacements to allow internal numeric values. Additional tests * Adds missing license header updates website/.spelling to add words used in prometheus-emitter docs. updates docs/operations/metrics.md to correct the spelling of bufferPoolName * fixes version in extensions-contrib/prometheus-emitter * fix style guide errors * update import ordering * add another word to website/.spelling * remove unthrown declared exception * remove unused import * Pushgateway strategy for metrics * typo * Format fix and nullable strategy * Update pom file for prometheus-emitter * code review comments. Counter to gauge for cache metrics, periodical task to pushGateway * Syntax fix * Dimension label regex include numeric character back, fix previous commit * bump prometheus-emitter pom dev version * Remove scheduled task inside poen that push metrics * Fix checkstyle * Unit test coverage * Unit test coverage * Spelling * Doc fix * spelling Co-authored-by: Michael Schiff <michael.schiff@tubemogul.com> Co-authored-by: Michael Schiff <schiff.michael@gmail.com> Co-authored-by: Tianxin Zhao <tianxin.zhao@tubemogul.com> Co-authored-by: Tianxin Zhao <tizhao@adobe.com>
This commit is contained in:
parent
c66951a59e
commit
a57c28e9ce
|
@ -0,0 +1,70 @@
|
|||
---
|
||||
id: prometheus
|
||||
title: "Prometheus Emitter"
|
||||
---
|
||||
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
|
||||
To use this Apache Druid extension, make sure to [include](../../development/extensions.md#loading-extensions) `prometheus-emitter` extension.
|
||||
|
||||
## Introduction
|
||||
|
||||
This extension exposes [Druid metrics](https://druid.apache.org/docs/latest/operations/metrics.html) for collection by a Prometheus server (https://prometheus.io/).
|
||||
Emitter is enabled by setting `druid.emitter=prometheus` [configs](https://druid.apache.org/docs/latest/configuration/index.html#emitting-metrics) or include `prometheus` in the composing emitter list.
|
||||
|
||||
## Configuration
|
||||
|
||||
All the configuration parameters for the Prometheus emitter are under `druid.emitter.prometheus`.
|
||||
|
||||
|property|description|required?|default|
|
||||
|--------|-----------|---------|-------|
|
||||
|`druid.emitter.prometheus.strategy`|The strategy to expose prometheus metrics. Default strategy `exporter` would expose metrics for scraping purpose. Only peon task (short-lived jobs) need to use `pushgateway` strategy.|yes|exporter|
|
||||
|`druid.emitter.prometheus.port`|The port on which to expose the prometheus HTTPServer. Required if using exporter strategy.|no|none|
|
||||
|`druid.emitter.prometheus.namespace`|Optional metric namespace. Must match the regex `[a-zA-Z_:][a-zA-Z0-9_:]*`|no|"druid"|
|
||||
|`druid.emitter.prometheus.dimensionMapPath`|JSON file defining the Prometheus metric type, desired dimensions, help text, and conversionFactor for every Druid metric.|no|Default mapping provided. See below.|
|
||||
|`druid.emitter.prometheus.pushGatewayAddress`|Pushgateway address. Required if using Pushgateway strategy|no|none|
|
||||
|
||||
|
||||
### Metric names
|
||||
|
||||
All metric names and labels are reformatted to match Prometheus standards.
|
||||
- For names: all characters which are not alphanumeric, underscores, or colons (matching `[^a-zA-Z_:][^a-zA-Z0-9_:]*`) are replaced with `_`
|
||||
- For labels: all characters which are not alphanumeric or underscores (matching `[^a-zA-Z0-9_][^a-zA-Z0-9_]*`) are replaced with `_`
|
||||
|
||||
### Metric mapping
|
||||
|
||||
Each metric to be collected by Prometheus must specify a type, one of `[timer, counter, guage]`. Prometheus Emitter expects this mapping to
|
||||
be provided as a JSON file. Additionally, this mapping specifies which dimensions should be included for each metric. Prometheus expects
|
||||
histogram timers to use Seconds as the base unit. Timers which do not use seconds as a base unit can use the `conversionFactor` to set
|
||||
the base time unit. If the user does not specify their own JSON file, a default mapping is used. All
|
||||
metrics are expected to be mapped. Metrics which are not mapped will not be tracked.
|
||||
Prometheus metric path is organized using the following schema:
|
||||
`<druid metric name> : { "dimensions" : <dimension list>, "type" : <timer|counter|gauge>, conversionFactor: <conversionFactor>, "help" : <help text>,}`
|
||||
e.g.
|
||||
`query/time" : { "dimensions" : ["dataSource", "type"], "conversionFactor": 1000.0, "type" : "timer", "help": "Seconds taken to complete a query."}`
|
||||
|
||||
For metrics which are emitted from multiple services with different dimensions, the metric name is prefixed with
|
||||
the service name.
|
||||
e.g.
|
||||
`"coordinator-segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
|
||||
"historical-segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" }`
|
||||
|
||||
For most use-cases, the default mapping is sufficient.
|
|
@ -0,0 +1,123 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>druid</artifactId>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<version>0.22.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.apache.druid.extensions.contrib</groupId>
|
||||
<artifactId>prometheus-emitter</artifactId>
|
||||
<name>prometheus-emitter</name>
|
||||
<description>Extension support for collecting Druid metrics with Prometheus</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-core</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient</artifactId>
|
||||
<version>0.7.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient_httpserver</artifactId>
|
||||
<version>0.7.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient_pushgateway</artifactId>
|
||||
<version>0.7.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>joda-time</groupId>
|
||||
<artifactId>joda-time</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.inject</groupId>
|
||||
<artifactId>guice</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.easymock</groupId>
|
||||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>pl.pragmatists</groupId>
|
||||
<artifactId>JUnitParams</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.prometheus;
|
||||
|
||||
import io.prometheus.client.SimpleCollector;
|
||||
|
||||
public class DimensionsAndCollector
|
||||
{
|
||||
private final String[] dimensions;
|
||||
private final SimpleCollector collector;
|
||||
private final double conversionFactor;
|
||||
|
||||
DimensionsAndCollector(String[] dimensions, SimpleCollector collector, double conversionFactor)
|
||||
{
|
||||
this.dimensions = dimensions;
|
||||
this.collector = collector;
|
||||
this.conversionFactor = conversionFactor;
|
||||
}
|
||||
|
||||
public String[] getDimensions()
|
||||
{
|
||||
return dimensions;
|
||||
}
|
||||
|
||||
public SimpleCollector getCollector()
|
||||
{
|
||||
return collector;
|
||||
}
|
||||
|
||||
public double getConversionFactor()
|
||||
{
|
||||
return conversionFactor;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.prometheus;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Strings;
|
||||
import io.prometheus.client.Counter;
|
||||
import io.prometheus.client.Gauge;
|
||||
import io.prometheus.client.Histogram;
|
||||
import io.prometheus.client.SimpleCollector;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.SortedSet;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class Metrics
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(Metrics.class);
|
||||
private final Map<String, DimensionsAndCollector> registeredMetrics = new HashMap<>();
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
public static final Pattern PATTERN = Pattern.compile("[^a-zA-Z_:][^a-zA-Z0-9_:]*");
|
||||
|
||||
public DimensionsAndCollector getByName(String name, String service)
|
||||
{
|
||||
if (registeredMetrics.containsKey(name)) {
|
||||
return registeredMetrics.get(name);
|
||||
} else {
|
||||
return registeredMetrics.getOrDefault(service + "_" + name, null);
|
||||
}
|
||||
}
|
||||
|
||||
public Metrics(String namespace, String path)
|
||||
{
|
||||
Map<String, Metric> metrics = readConfig(path);
|
||||
for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
|
||||
String name = entry.getKey();
|
||||
Metric metric = entry.getValue();
|
||||
Metric.Type type = metric.type;
|
||||
String[] dimensions = metric.dimensions.toArray(new String[0]);
|
||||
String formattedName = PATTERN.matcher(StringUtils.toLowerCase(name)).replaceAll("_");
|
||||
SimpleCollector collector = null;
|
||||
if (Metric.Type.count.equals(type)) {
|
||||
collector = new Counter.Builder()
|
||||
.namespace(namespace)
|
||||
.name(formattedName)
|
||||
.labelNames(dimensions)
|
||||
.help(metric.help)
|
||||
.register();
|
||||
} else if (Metric.Type.gauge.equals(type)) {
|
||||
collector = new Gauge.Builder()
|
||||
.namespace(namespace)
|
||||
.name(formattedName)
|
||||
.labelNames(dimensions)
|
||||
.help(metric.help)
|
||||
.register();
|
||||
} else if (Metric.Type.timer.equals(type)) {
|
||||
collector = new Histogram.Builder()
|
||||
.namespace(namespace)
|
||||
.name(formattedName)
|
||||
.labelNames(dimensions)
|
||||
.buckets(.1, .25, .5, .75, 1, 2.5, 5, 7.5, 10, 30, 60, 120, 300)
|
||||
.help(metric.help)
|
||||
.register();
|
||||
} else {
|
||||
log.error("Unrecognized metric type [%s]", type);
|
||||
}
|
||||
|
||||
if (collector != null) {
|
||||
registeredMetrics.put(name, new DimensionsAndCollector(dimensions, collector, metric.conversionFactor));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Map<String, Metric> readConfig(String path)
|
||||
{
|
||||
try {
|
||||
InputStream is;
|
||||
if (Strings.isNullOrEmpty(path)) {
|
||||
log.info("Using default metric configuration");
|
||||
is = this.getClass().getClassLoader().getResourceAsStream("defaultMetrics.json");
|
||||
} else {
|
||||
log.info("Using metric configuration at [%s]", path);
|
||||
is = new FileInputStream(new File(path));
|
||||
}
|
||||
return mapper.readerFor(new TypeReference<Map<String, Metric>>()
|
||||
{
|
||||
}).readValue(is);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "Failed to parse metric configuration");
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, DimensionsAndCollector> getRegisteredMetrics()
|
||||
{
|
||||
return registeredMetrics;
|
||||
}
|
||||
|
||||
public static class Metric
|
||||
{
|
||||
public final SortedSet<String> dimensions;
|
||||
public final Type type;
|
||||
public final String help;
|
||||
public final double conversionFactor;
|
||||
|
||||
@JsonCreator
|
||||
public Metric(
|
||||
@JsonProperty("dimensions") SortedSet<String> dimensions,
|
||||
@JsonProperty("type") Type type,
|
||||
@JsonProperty("help") String help,
|
||||
@JsonProperty("conversionFactor") double conversionFactor
|
||||
)
|
||||
{
|
||||
this.dimensions = dimensions;
|
||||
this.type = type;
|
||||
this.help = help;
|
||||
this.conversionFactor = conversionFactor;
|
||||
}
|
||||
|
||||
public enum Type
|
||||
{
|
||||
count, gauge, timer
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.prometheus;
|
||||
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.prometheus.client.Counter;
|
||||
import io.prometheus.client.Gauge;
|
||||
import io.prometheus.client.Histogram;
|
||||
import io.prometheus.client.exporter.HTTPServer;
|
||||
import io.prometheus.client.exporter.PushGateway;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.core.Emitter;
|
||||
import org.apache.druid.java.util.emitter.core.Event;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PrometheusEmitter implements Emitter
|
||||
{
|
||||
private static final Logger log = new Logger(PrometheusEmitter.class);
|
||||
private final Metrics metrics;
|
||||
private final PrometheusEmitterConfig config;
|
||||
private final PrometheusEmitterConfig.Strategy strategy;
|
||||
private static final Pattern PATTERN = Pattern.compile("[^a-zA-Z0-9_][^a-zA-Z0-9_]*");
|
||||
|
||||
private HTTPServer server;
|
||||
private PushGateway pushGateway;
|
||||
private String identifier;
|
||||
|
||||
static PrometheusEmitter of(PrometheusEmitterConfig config)
|
||||
{
|
||||
return new PrometheusEmitter(config);
|
||||
}
|
||||
|
||||
public PrometheusEmitter(PrometheusEmitterConfig config)
|
||||
{
|
||||
this.config = config;
|
||||
this.strategy = config.getStrategy();
|
||||
metrics = new Metrics(config.getNamespace(), config.getDimensionMapPath());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
if (strategy.equals(PrometheusEmitterConfig.Strategy.exporter)) {
|
||||
if (server == null) {
|
||||
try {
|
||||
server = new HTTPServer(config.getPort());
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error(e, "Unable to start prometheus HTTPServer");
|
||||
}
|
||||
} else {
|
||||
log.error("HTTPServer is already started");
|
||||
}
|
||||
} else if (strategy.equals(PrometheusEmitterConfig.Strategy.pushgateway)) {
|
||||
pushGateway = new PushGateway(config.getPushGatewayAddress());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(Event event)
|
||||
{
|
||||
if (event instanceof ServiceMetricEvent) {
|
||||
emitMetric((ServiceMetricEvent) event);
|
||||
}
|
||||
}
|
||||
|
||||
private void emitMetric(ServiceMetricEvent metricEvent)
|
||||
{
|
||||
String name = metricEvent.getMetric();
|
||||
String service = metricEvent.getService();
|
||||
Map<String, Object> userDims = metricEvent.getUserDims();
|
||||
identifier = (userDims.get("task") == null ? metricEvent.getHost() : (String) userDims.get("task"));
|
||||
Number value = metricEvent.getValue();
|
||||
|
||||
DimensionsAndCollector metric = metrics.getByName(name, service);
|
||||
if (metric != null) {
|
||||
String[] labelValues = new String[metric.getDimensions().length];
|
||||
String[] labelNames = metric.getDimensions();
|
||||
for (int i = 0; i < labelValues.length; i++) {
|
||||
String labelName = labelNames[i];
|
||||
//labelName is controlled by the user. Instead of potential NPE on invalid labelName we use "unknown" as the dimension value
|
||||
Object userDim = userDims.get(labelName);
|
||||
labelValues[i] = userDim != null ? PATTERN.matcher(userDim.toString()).replaceAll("_") : "unknown";
|
||||
}
|
||||
|
||||
if (metric.getCollector() instanceof Counter) {
|
||||
((Counter) metric.getCollector()).labels(labelValues).inc(value.doubleValue());
|
||||
} else if (metric.getCollector() instanceof Gauge) {
|
||||
((Gauge) metric.getCollector()).labels(labelValues).set(value.doubleValue());
|
||||
} else if (metric.getCollector() instanceof Histogram) {
|
||||
((Histogram) metric.getCollector()).labels(labelValues).observe(value.doubleValue() / metric.getConversionFactor());
|
||||
} else {
|
||||
log.error("Unrecognized metric type [%s]", metric.getCollector().getClass());
|
||||
}
|
||||
} else {
|
||||
log.debug("Unmapped metric [%s]", name);
|
||||
}
|
||||
}
|
||||
|
||||
private void pushMetric()
|
||||
{
|
||||
Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
|
||||
try {
|
||||
for (DimensionsAndCollector collector : map.values()) {
|
||||
if (config.getNamespace() != null) {
|
||||
pushGateway.push(collector.getCollector(), config.getNamespace(), ImmutableMap.of(config.getNamespace(), identifier));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error(e, "Unable to push prometheus metrics to pushGateway");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush()
|
||||
{
|
||||
if (pushGateway != null) {
|
||||
pushMetric();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (strategy.equals(PrometheusEmitterConfig.Strategy.exporter)) {
|
||||
if (server != null) {
|
||||
server.stop();
|
||||
}
|
||||
} else {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
public HTTPServer getServer()
|
||||
{
|
||||
return server;
|
||||
}
|
||||
|
||||
public PushGateway getPushGateway()
|
||||
{
|
||||
return pushGateway;
|
||||
}
|
||||
|
||||
public void setPushGateway(PushGateway pushGateway)
|
||||
{
|
||||
this.pushGateway = pushGateway;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.prometheus;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PrometheusEmitterConfig
|
||||
{
|
||||
|
||||
static final Pattern PATTERN = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*");
|
||||
|
||||
@JsonProperty
|
||||
private final Strategy strategy;
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
private final String namespace;
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
private final String dimensionMapPath;
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
private final Integer port;
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
private final String pushGatewayAddress;
|
||||
|
||||
@JsonCreator
|
||||
public PrometheusEmitterConfig(
|
||||
@JsonProperty("strategy") @Nullable Strategy strategy,
|
||||
@JsonProperty("namespace") @Nullable String namespace,
|
||||
@JsonProperty("dimensionMapPath") @Nullable String dimensionMapPath,
|
||||
@JsonProperty("port") @Nullable Integer port,
|
||||
@JsonProperty("pushGatewayAddress") @Nullable String pushGatewayAddress
|
||||
)
|
||||
{
|
||||
|
||||
this.strategy = strategy != null ? strategy : Strategy.exporter;
|
||||
this.namespace = namespace != null ? namespace : "druid";
|
||||
Preconditions.checkArgument(PATTERN.matcher(this.namespace).matches(), "Invalid namespace " + this.namespace);
|
||||
this.dimensionMapPath = dimensionMapPath;
|
||||
this.port = port;
|
||||
this.pushGatewayAddress = pushGatewayAddress;
|
||||
}
|
||||
|
||||
public String getNamespace()
|
||||
{
|
||||
return namespace;
|
||||
}
|
||||
|
||||
public String getDimensionMapPath()
|
||||
{
|
||||
return dimensionMapPath;
|
||||
}
|
||||
|
||||
public int getPort()
|
||||
{
|
||||
return port;
|
||||
}
|
||||
|
||||
public String getPushGatewayAddress()
|
||||
{
|
||||
return pushGatewayAddress;
|
||||
}
|
||||
|
||||
public Strategy getStrategy()
|
||||
{
|
||||
return strategy;
|
||||
}
|
||||
|
||||
public enum Strategy
|
||||
{
|
||||
exporter, pushgateway
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.prometheus;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.name.Named;
|
||||
import org.apache.druid.guice.JsonConfigProvider;
|
||||
import org.apache.druid.guice.ManageLifecycle;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.druid.java.util.emitter.core.Emitter;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PrometheusEmitterModule implements DruidModule
|
||||
{
|
||||
private static final String EMITTER_TYPE = "prometheus";
|
||||
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, PrometheusEmitterConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@ManageLifecycle
|
||||
@Named(EMITTER_TYPE)
|
||||
public Emitter getEmitter(PrometheusEmitterConfig config)
|
||||
{
|
||||
return PrometheusEmitter.of(config);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.druid.emitter.prometheus.PrometheusEmitterModule
|
|
@ -0,0 +1,128 @@
|
|||
{
|
||||
"query/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete a query."},
|
||||
"query/bytes" : { "dimensions" : ["dataSource", "type"], "type" : "count", "help": "Number of bytes returned in query response."},
|
||||
"query/node/time" : { "dimensions" : ["server"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual historical/realtime processes."},
|
||||
"query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer", "help": "Time to first byte. Seconds elapsed until Broker starts receiving the response from individual historical/realtime processes."},
|
||||
"query/node/bytes" : { "dimensions" : ["server"], "type" : "count", "help": "Number of bytes returned from querying individual historical/realtime processes."},
|
||||
"query/node/backpressure": { "dimensions" : ["server"], "type" : "timer", "help": "Seconds that the channel to this process has spent suspended due to backpressure."},
|
||||
"query/intervalChunk/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk. This metric is deprecated and will be removed in the future because interval chunking is deprecated."},
|
||||
|
||||
"query/segment/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment. Includes time to page in the segment from disk."},
|
||||
"query/wait/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent waiting for a segment to be scanned."},
|
||||
"segment/scan/pending" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments in queue waiting to be scanned."},
|
||||
"query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment or hit the cache (if it is enabled on the Historical process)."},
|
||||
"query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"},
|
||||
|
||||
"query/count" : { "dimensions" : [], "type" : "count", "help": "Number of total queries" },
|
||||
"query/success/count" : { "dimensions" : [], "type" : "count", "help": "Number of queries successfully processed"},
|
||||
"query/failed/count" : { "dimensions" : [], "type" : "count", "help": "Number of failed queries"},
|
||||
"query/interrupted/count" : { "dimensions" : [], "type" : "count", "help": "Number of queries interrupted due to cancellation or timeout"},
|
||||
|
||||
"query/cache/delta/numEntries" : { "dimensions" : [], "type" : "gauge", "help": "Number of entries in cache"},
|
||||
"query/cache/delta/sizeBytes" : { "dimensions" : [], "type" : "gauge", "help": "Size of cache in bytes."},
|
||||
"query/cache/delta/hits" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache hits."},
|
||||
"query/cache/delta/misses" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache misses."},
|
||||
"query/cache/delta/evictions" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache evictions."},
|
||||
"query/cache/delta/hitRate" : { "dimensions" : [], "type" : "gauge","help": "Cache hit rate."},
|
||||
"query/cache/delta/averageBytes" : { "dimensions" : [], "type" : "gauge", "help": "Average size of record in bytes"},
|
||||
"query/cache/delta/timeouts" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache timeouts"},
|
||||
"query/cache/delta/errors" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache errors."},
|
||||
|
||||
"query/cache/total/numEntries" : { "dimensions" : [], "type" : "gauge","help": "Total number of entries in cache" },
|
||||
"query/cache/total/sizeBytes" : { "dimensions" : [], "type" : "gauge", "help": "Total size of cache in bytes."},
|
||||
"query/cache/total/hits" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache hits."},
|
||||
"query/cache/total/misses" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache misses." },
|
||||
"query/cache/total/evictions" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache evictions."},
|
||||
"query/cache/total/hitRate" : { "dimensions" : [], "type" : "gauge", "help": "Total cache hit rate"},
|
||||
"query/cache/total/averageBytes" : { "dimensions" : [], "type" : "gauge", "help": "Total average record size in bytes"},
|
||||
"query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache timeouts"},
|
||||
"query/cache/total/errors" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache errors" },
|
||||
|
||||
"ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because they are outside the windowPeriod."},
|
||||
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are unparseable." },
|
||||
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are duplicated."},
|
||||
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events successfully processed per emission period." },
|
||||
"ingest/events/messageGap" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Time gap between the data time in event and current system time."},
|
||||
"ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of Druid rows persisted."},
|
||||
"ingest/persists/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of times persist occurred." },
|
||||
"ingest/persists/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent doing intermediate persist."},
|
||||
"ingest/persists/cpu" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000000000.0, "help": "Cpu time in Seconds spent on doing intermediate persist." },
|
||||
"ingest/persists/backPressure" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Seconds spent creating persist tasks and blocking waiting for them to finish." },
|
||||
"ingest/persists/failed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of persists that failed." },
|
||||
"ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of handoffs that failed." },
|
||||
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent merging intermediate segments" },
|
||||
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000000000.0, "help": "Cpu time in Seconds spent on merging intermediate segments."},
|
||||
|
||||
"ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
|
||||
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
|
||||
"ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
|
||||
|
||||
"task/success/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included."},
|
||||
"task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included."},
|
||||
"task/running/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included."},
|
||||
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included."},
|
||||
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included."},
|
||||
"task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer", "conversionFactor": 1000.0, "help": "Milliseconds taken to run a task."},
|
||||
|
||||
"segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of new segments created." },
|
||||
"segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of segments moved/archived via the Move Task." },
|
||||
"segment/nuked/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of segments deleted via the Kill Task." },
|
||||
|
||||
"segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments assigned to be loaded in the cluster."},
|
||||
"segment/moved/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments moved in the cluster." },
|
||||
"segment/dropped/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments dropped due to being overshadowed." },
|
||||
"segment/deleted/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments dropped due to rules."},
|
||||
"segment/unneeded/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments dropped due to being marked as unused."},
|
||||
"segment/unavailable/count" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Number of segments (not including replicas) left to load until segments that should be loaded in the cluster are available for queries."},
|
||||
"segment/underReplicated/count" : { "dimensions" : ["dataSource", "tier"], "type" : "gauge", "help": "Number of segments (including replicas) left to load until segments that should be loaded in the cluster are available for queries."},
|
||||
"segment/cost/raw" : { "dimensions" : ["tier"], "type" : "count", "help": "Used in cost balancing. The raw cost of hosting segments."},
|
||||
"segment/cost/normalization" : { "dimensions" : ["tier"], "type" : "count", "help": "Used in cost balancing. The normalization of hosting segments."},
|
||||
"segment/cost/normalized" : { "dimensions" : ["tier"], "type" : "count", "help": "Used in cost balancing. The normalized cost of hosting segments."},
|
||||
"segment/loadQueue/size" : { "dimensions" : ["server"], "type" : "gauge", "help": "Size in bytes of segments to load."},
|
||||
"segment/loadQueue/failed" : { "dimensions" : ["server"], "type" : "gauge", "help": "Number of segments that failed to load."},
|
||||
"segment/loadQueue/count" : { "dimensions" : ["server"], "type" : "gauge", "help": "Number of segments to load."},
|
||||
"segment/dropQueue/count" : { "dimensions" : ["server"], "type" : "gauge", "help": "Number of segments to drop."},
|
||||
"segment/size" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Size in bytes of available segments."},
|
||||
"segment/overShadowed/count" : { "dimensions" : [], "type" : "gauge", "help": "Number of overShadowed segments."},
|
||||
|
||||
"segment/max" : { "dimensions" : [], "type" : "gauge", "help": "Maximum byte limit available for segments."},
|
||||
"segment/used" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "help": "Bytes used for served segments."},
|
||||
"segment/usedPercent" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "help": "Percentage of space used by served segments."},
|
||||
"segment/pendingDelete" : { "dimensions" : [], "type" : "gauge", "help": "On-disk size in bytes of segments that are waiting to be cleared out"},
|
||||
|
||||
"jvm/pool/committed" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Committed pool."},
|
||||
"jvm/pool/init" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Initial pool."},
|
||||
"jvm/pool/max" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Max pool."},
|
||||
"jvm/pool/used" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Pool used."},
|
||||
"jvm/bufferpool/count" : { "dimensions" : ["bufferpoolName"], "type" : "gauge", "help": "bufferpool count"},
|
||||
"jvm/bufferpool/used" : { "dimensions" : ["bufferpoolName"], "type" : "gauge", "help": "bufferpool used"},
|
||||
"jvm/bufferpool/capacity" : { "dimensions" : ["bufferpoolName"], "type" : "gauge", "help": "bufferpool capacity"},
|
||||
"jvm/mem/init" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Initial memory"},
|
||||
"jvm/mem/max" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Max memory"},
|
||||
"jvm/mem/used" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Used memory"},
|
||||
"jvm/mem/committed" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Committed memory"},
|
||||
"jvm/gc/count" : { "dimensions" : ["gcName"], "type" : "count", "help": "Garbage collection count"},
|
||||
"jvm/gc/cpu" : { "dimensions" : ["gcName"], "type" : "count", "help": "Count of CPU time in Nanoseconds spent on garbage collection. Note: `jvm/gc/cpu` represents the total time over multiple GC cycles; divide by `jvm/gc/count` to get the mean GC time per cycle."},
|
||||
|
||||
"ingest/events/buffered" : { "dimensions" : ["serviceName", "bufferCapacity"], "type" : "gauge", "help": "Number of events queued in the EventReceiverFirehose's buffer"},
|
||||
|
||||
"sys/swap/free" : { "dimensions" : [], "type" : "gauge", "help": "Free swap"},
|
||||
"sys/swap/max" : { "dimensions" : [], "type" : "gauge", "help": "Max swap"},
|
||||
"sys/swap/pageIn" : { "dimensions" : [], "type" : "gauge", "help": "Paged in swap"},
|
||||
"sys/swap/pageOut" : { "dimensions" : [], "type" : "gauge", "help": "Paged out swap"},
|
||||
"sys/disk/write/count" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Writes to disk."},
|
||||
"sys/disk/read/count" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Reads from disk."},
|
||||
"sys/disk/write/size" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Bytes written to disk. Can we used to determine how much paging is occurring with regards to segments."},
|
||||
"sys/disk/read/size" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Bytes read from disk. Can we used to determine how much paging is occurring with regards to segments."},
|
||||
"sys/net/write/size" : { "dimensions" : [], "type" : "count", "help": "Bytes written to the network."},
|
||||
"sys/net/read/size" : { "dimensions" : [], "type" : "count", "help": "Bytes read from the network."},
|
||||
"sys/fs/used" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge", "help": "Filesystem bytes used."},
|
||||
"sys/fs/max" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge", "help": "Filesystesm bytes max."},
|
||||
"sys/mem/used" : { "dimensions" : [], "type" : "gauge", "help": "Memory used."},
|
||||
"sys/mem/max" : { "dimensions" : [], "type" : "gauge", "help": "Memory max"},
|
||||
"sys/storage/used" : { "dimensions" : ["fsDirName"], "type" : "gauge", "help": "Disk space used."},
|
||||
"sys/cpu" : { "dimensions" : ["cpuName", "cpuTime"], "type" : "gauge", "help": "CPU used"},
|
||||
|
||||
"coordinator_segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Number of served segments."},
|
||||
"historical_segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "help": "Number of served segments."}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.prometheus;
|
||||
|
||||
import io.prometheus.client.Histogram;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MetricsTest
|
||||
{
|
||||
@Test
|
||||
public void testMetricsConfiguration()
|
||||
{
|
||||
Metrics metrics = new Metrics("test", null);
|
||||
DimensionsAndCollector dimensionsAndCollector = metrics.getByName("query/time", "historical");
|
||||
Assert.assertNotNull(dimensionsAndCollector);
|
||||
String[] dimensions = dimensionsAndCollector.getDimensions();
|
||||
Assert.assertEquals("dataSource", dimensions[0]);
|
||||
Assert.assertEquals("type", dimensions[1]);
|
||||
Assert.assertEquals(1000.0, dimensionsAndCollector.getConversionFactor(), 0.0);
|
||||
Assert.assertTrue(dimensionsAndCollector.getCollector() instanceof Histogram);
|
||||
|
||||
DimensionsAndCollector d = metrics.getByName("segment/loadQueue/count", "historical");
|
||||
Assert.assertNotNull(d);
|
||||
String[] dims = d.getDimensions();
|
||||
Assert.assertEquals("server", dims[0]);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.prometheus;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.prometheus.client.Collector;
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
import io.prometheus.client.exporter.PushGateway;
|
||||
import org.apache.druid.java.util.emitter.core.Emitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.easymock.EasyMock.anyObject;
|
||||
import static org.easymock.EasyMock.anyString;
|
||||
import static org.easymock.EasyMock.mock;
|
||||
|
||||
|
||||
public class PrometheusEmitterTest
|
||||
{
|
||||
@Test
|
||||
public void testEmitter()
|
||||
{
|
||||
PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null);
|
||||
PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule();
|
||||
Emitter emitter = prometheusEmitterModule.getEmitter(config);
|
||||
ServiceMetricEvent build = ServiceMetricEvent.builder()
|
||||
.setDimension("server", "druid-data01.vpc.region")
|
||||
.build("segment/loadQueue/count", 10)
|
||||
.build(ImmutableMap.of("service", "historical"));
|
||||
Assert.assertEquals("historical", build.getService());
|
||||
Assert.assertFalse(build.getUserDims().isEmpty());
|
||||
emitter.emit(build);
|
||||
Double count = CollectorRegistry.defaultRegistry.getSampleValue(
|
||||
"druid_segment_loadqueue_count", new String[]{"server"}, new String[]{"druid_data01_vpc_region"}
|
||||
);
|
||||
Assert.assertEquals(10, count.intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmitterMetric()
|
||||
{
|
||||
PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace", null, 0, "pushgateway");
|
||||
PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule();
|
||||
Emitter emitter = prometheusEmitterModule.getEmitter(config);
|
||||
ServiceMetricEvent build = ServiceMetricEvent.builder()
|
||||
.setDimension("dataSource", "test")
|
||||
.setDimension("taskType", "index_parallel")
|
||||
.build("task/run/time", 500)
|
||||
.build(ImmutableMap.of("service", "overlord"));
|
||||
emitter.emit(build);
|
||||
double assertEpsilon = 0.0001;
|
||||
Assert.assertEquals(0.0, CollectorRegistry.defaultRegistry.getSampleValue(
|
||||
"namespace_task_run_time_bucket", new String[]{"dataSource", "taskType", "le"}, new String[]{"test", "index_parallel", "0.1"}
|
||||
), assertEpsilon);
|
||||
Assert.assertEquals(1.0, CollectorRegistry.defaultRegistry.getSampleValue(
|
||||
"namespace_task_run_time_bucket", new String[]{"dataSource", "taskType", "le"}, new String[]{"test", "index_parallel", "0.5"}
|
||||
), assertEpsilon);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmitterStart()
|
||||
{
|
||||
PrometheusEmitterConfig exportEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "namespace1", null, 0, null);
|
||||
PrometheusEmitter exportEmitter = new PrometheusEmitter(exportEmitterConfig);
|
||||
exportEmitter.start();
|
||||
Assert.assertNotNull(exportEmitter.getServer());
|
||||
|
||||
PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace2", null, 0, "pushgateway");
|
||||
PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig);
|
||||
pushEmitter.start();
|
||||
Assert.assertNotNull(pushEmitter.getPushGateway());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmitterPush() throws IOException
|
||||
{
|
||||
PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway");
|
||||
|
||||
PushGateway mockPushGateway = mock(PushGateway.class);
|
||||
mockPushGateway.push(anyObject(Collector.class), anyString(), anyObject(ImmutableMap.class));
|
||||
|
||||
PrometheusEmitter emitter = new PrometheusEmitter(emitterConfig);
|
||||
emitter.start();
|
||||
emitter.setPushGateway(mockPushGateway);
|
||||
ServiceMetricEvent build = ServiceMetricEvent.builder()
|
||||
.setDimension("task", "index_parallel")
|
||||
.build("task/run/time", 500)
|
||||
.build(ImmutableMap.of("service", "peon"));
|
||||
emitter.emit(build);
|
||||
emitter.flush();
|
||||
}
|
||||
}
|
|
@ -110,7 +110,7 @@
|
|||
"jvm/gc/count" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },
|
||||
"jvm/gc/cpu" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },
|
||||
|
||||
"ingest/events/buffered" : { "dimensions" : ["serviceName, bufferCapacity"], "type" : "gauge"},
|
||||
"ingest/events/buffered" : { "dimensions" : ["serviceName", "bufferCapacity"], "type" : "gauge"},
|
||||
|
||||
"sys/swap/free" : { "dimensions" : [], "type" : "gauge"},
|
||||
"sys/swap/max" : { "dimensions" : [], "type" : "gauge"},
|
||||
|
|
1
pom.xml
1
pom.xml
|
@ -198,6 +198,7 @@
|
|||
<module>extensions-contrib/influxdb-emitter</module>
|
||||
<module>extensions-contrib/gce-extensions</module>
|
||||
<module>extensions-contrib/aliyun-oss-extensions</module>
|
||||
<module>extensions-contrib/prometheus-emitter</module>
|
||||
<!-- distribution packaging -->
|
||||
<module>distribution</module>
|
||||
</modules>
|
||||
|
|
|
@ -583,6 +583,11 @@ com.microsoft.sqlserver.jdbc.SQLServerDriver
|
|||
sqljdbc
|
||||
- ../docs/development/extensions-contrib/statsd.md
|
||||
convertRange
|
||||
- ../docs/development/extensions-contrib/prometheus.md
|
||||
HTTPServer
|
||||
conversionFactor
|
||||
prometheus
|
||||
Pushgateway
|
||||
- ../docs/development/extensions-contrib/tdigestsketch-quantiles.md
|
||||
postAggregator
|
||||
quantileFromTDigestSketch
|
||||
|
|
Loading…
Reference in New Issue