From a57c28e9ce2829f45e31d29276bf14d9f7afbb50 Mon Sep 17 00:00:00 2001 From: Tianxin Zhao <39429932+Tiaaa@users.noreply.github.com> Date: Tue, 9 Mar 2021 14:37:31 -0800 Subject: [PATCH] prometheus metric exporter (#10412) * prometheus-emitter * use existing jetty server to expose prometheus collection endpoint * unused variables * better variable names * removed unused dependencies * more metric definitions * reorganize * use prometheus HTTPServer instead of hooking into Jetty server * temporary empty help string * temporary non-empty help. fix incorrect dimension value in JSON (also updated statsd json) * added full help text. added metric conversion factor for timers that are not using seconds. Correct metric dimension name in documentation * added documentation for prometheus emitter * safety for invalid labelNames * fix travis checks * Unit test and better sanitization of metrics names and label values * add precondition to check namespace against regex * use precompiled regex * remove static imports. fix metric types * better docs. fix possible NPE in PrometheusEmitterConfig. Guard against multiple calls to PrometheusEmitter.start() * Update regex for label-value replacements to allow internal numeric values. Additional tests * Adds missing license header updates website/.spelling to add words used in prometheus-emitter docs. updates docs/operations/metrics.md to correct the spelling of bufferPoolName * fixes version in extensions-contrib/prometheus-emitter * fix style guide errors * update import ordering * add another word to website/.spelling * remove unthrown declared exception * remove unused import * Pushgateway strategy for metrics * typo * Format fix and nullable strategy * Update pom file for prometheus-emitter * code review comments. Counter to gauge for cache metrics, periodical task to pushGateway * Syntax fix * Dimension label regex include numeric character back, fix previous commit * bump prometheus-emitter pom dev version * Remove scheduled task inside poen that push metrics * Fix checkstyle * Unit test coverage * Unit test coverage * Spelling * Doc fix * spelling Co-authored-by: Michael Schiff Co-authored-by: Michael Schiff Co-authored-by: Tianxin Zhao Co-authored-by: Tianxin Zhao --- .../extensions-contrib/prometheus.md | 70 +++++++ extensions-contrib/prometheus-emitter/pom.xml | 123 ++++++++++++ .../prometheus/DimensionsAndCollector.java | 51 +++++ .../druid/emitter/prometheus/Metrics.java | 155 +++++++++++++++ .../emitter/prometheus/PrometheusEmitter.java | 176 ++++++++++++++++++ .../prometheus/PrometheusEmitterConfig.java | 103 ++++++++++ .../prometheus/PrometheusEmitterModule.java | 62 ++++++ ...rg.apache.druid.initialization.DruidModule | 16 ++ .../src/main/resources/defaultMetrics.json | 128 +++++++++++++ .../druid/emitter/prometheus/MetricsTest.java | 45 +++++ .../prometheus/PrometheusEmitterTest.java | 112 +++++++++++ .../resources/defaultMetricDimensions.json | 2 +- pom.xml | 1 + website/.spelling | 5 + 14 files changed, 1048 insertions(+), 1 deletion(-) create mode 100644 docs/development/extensions-contrib/prometheus.md create mode 100644 extensions-contrib/prometheus-emitter/pom.xml create mode 100644 extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java create mode 100644 extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java create mode 100644 extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java create mode 100644 extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java create mode 100644 extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterModule.java create mode 100644 extensions-contrib/prometheus-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule create mode 100644 extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json create mode 100644 extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java create mode 100644 extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java diff --git a/docs/development/extensions-contrib/prometheus.md b/docs/development/extensions-contrib/prometheus.md new file mode 100644 index 00000000000..2c7ea089ab1 --- /dev/null +++ b/docs/development/extensions-contrib/prometheus.md @@ -0,0 +1,70 @@ +--- +id: prometheus +title: "Prometheus Emitter" +--- + + + + +To use this Apache Druid extension, make sure to [include](../../development/extensions.md#loading-extensions) `prometheus-emitter` extension. + +## Introduction + +This extension exposes [Druid metrics](https://druid.apache.org/docs/latest/operations/metrics.html) for collection by a Prometheus server (https://prometheus.io/). +Emitter is enabled by setting `druid.emitter=prometheus` [configs](https://druid.apache.org/docs/latest/configuration/index.html#emitting-metrics) or include `prometheus` in the composing emitter list. + +## Configuration + +All the configuration parameters for the Prometheus emitter are under `druid.emitter.prometheus`. + +|property|description|required?|default| +|--------|-----------|---------|-------| +|`druid.emitter.prometheus.strategy`|The strategy to expose prometheus metrics. Default strategy `exporter` would expose metrics for scraping purpose. Only peon task (short-lived jobs) need to use `pushgateway` strategy.|yes|exporter| +|`druid.emitter.prometheus.port`|The port on which to expose the prometheus HTTPServer. Required if using exporter strategy.|no|none| +|`druid.emitter.prometheus.namespace`|Optional metric namespace. Must match the regex `[a-zA-Z_:][a-zA-Z0-9_:]*`|no|"druid"| +|`druid.emitter.prometheus.dimensionMapPath`|JSON file defining the Prometheus metric type, desired dimensions, help text, and conversionFactor for every Druid metric.|no|Default mapping provided. See below.| +|`druid.emitter.prometheus.pushGatewayAddress`|Pushgateway address. Required if using Pushgateway strategy|no|none| + + +### Metric names + +All metric names and labels are reformatted to match Prometheus standards. +- For names: all characters which are not alphanumeric, underscores, or colons (matching `[^a-zA-Z_:][^a-zA-Z0-9_:]*`) are replaced with `_` +- For labels: all characters which are not alphanumeric or underscores (matching `[^a-zA-Z0-9_][^a-zA-Z0-9_]*`) are replaced with `_` + +### Metric mapping + +Each metric to be collected by Prometheus must specify a type, one of `[timer, counter, guage]`. Prometheus Emitter expects this mapping to +be provided as a JSON file. Additionally, this mapping specifies which dimensions should be included for each metric. Prometheus expects +histogram timers to use Seconds as the base unit. Timers which do not use seconds as a base unit can use the `conversionFactor` to set +the base time unit. If the user does not specify their own JSON file, a default mapping is used. All +metrics are expected to be mapped. Metrics which are not mapped will not be tracked. +Prometheus metric path is organized using the following schema: +` : { "dimensions" : , "type" : , conversionFactor: , "help" : ,}` +e.g. +`query/time" : { "dimensions" : ["dataSource", "type"], "conversionFactor": 1000.0, "type" : "timer", "help": "Seconds taken to complete a query."}` + +For metrics which are emitted from multiple services with different dimensions, the metric name is prefixed with +the service name. +e.g. +`"coordinator-segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, + "historical-segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" }` + +For most use-cases, the default mapping is sufficient. diff --git a/extensions-contrib/prometheus-emitter/pom.xml b/extensions-contrib/prometheus-emitter/pom.xml new file mode 100644 index 00000000000..f7d7536991f --- /dev/null +++ b/extensions-contrib/prometheus-emitter/pom.xml @@ -0,0 +1,123 @@ + + + + + druid + org.apache.druid + 0.22.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + org.apache.druid.extensions.contrib + prometheus-emitter + prometheus-emitter + Extension support for collecting Druid metrics with Prometheus + + + + org.apache.druid + druid-core + ${project.parent.version} + provided + + + io.prometheus + simpleclient + 0.7.0 + + + io.prometheus + simpleclient_httpserver + 0.7.0 + + + io.prometheus + simpleclient_pushgateway + 0.7.0 + + + com.google.code.findbugs + jsr305 + provided + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + joda-time + joda-time + provided + + + com.google.guava + guava + provided + + + com.google.inject + guice + provided + + + com.fasterxml.jackson.core + jackson-databind + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + junit + junit + test + + + org.easymock + easymock + test + + + pl.pragmatists + JUnitParams + test + + + org.apache.druid + druid-server + ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-processing + ${project.parent.version} + test-jar + test + + + diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java new file mode 100644 index 00000000000..ede4977aeee --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.prometheus; + +import io.prometheus.client.SimpleCollector; + +public class DimensionsAndCollector +{ + private final String[] dimensions; + private final SimpleCollector collector; + private final double conversionFactor; + + DimensionsAndCollector(String[] dimensions, SimpleCollector collector, double conversionFactor) + { + this.dimensions = dimensions; + this.collector = collector; + this.conversionFactor = conversionFactor; + } + + public String[] getDimensions() + { + return dimensions; + } + + public SimpleCollector getCollector() + { + return collector; + } + + public double getConversionFactor() + { + return conversionFactor; + } +} diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java new file mode 100644 index 00000000000..7006da3ca8a --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.prometheus; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Histogram; +import io.prometheus.client.SimpleCollector; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.regex.Pattern; + +public class Metrics +{ + + private static final Logger log = new Logger(Metrics.class); + private final Map registeredMetrics = new HashMap<>(); + private final ObjectMapper mapper = new ObjectMapper(); + public static final Pattern PATTERN = Pattern.compile("[^a-zA-Z_:][^a-zA-Z0-9_:]*"); + + public DimensionsAndCollector getByName(String name, String service) + { + if (registeredMetrics.containsKey(name)) { + return registeredMetrics.get(name); + } else { + return registeredMetrics.getOrDefault(service + "_" + name, null); + } + } + + public Metrics(String namespace, String path) + { + Map metrics = readConfig(path); + for (Map.Entry entry : metrics.entrySet()) { + String name = entry.getKey(); + Metric metric = entry.getValue(); + Metric.Type type = metric.type; + String[] dimensions = metric.dimensions.toArray(new String[0]); + String formattedName = PATTERN.matcher(StringUtils.toLowerCase(name)).replaceAll("_"); + SimpleCollector collector = null; + if (Metric.Type.count.equals(type)) { + collector = new Counter.Builder() + .namespace(namespace) + .name(formattedName) + .labelNames(dimensions) + .help(metric.help) + .register(); + } else if (Metric.Type.gauge.equals(type)) { + collector = new Gauge.Builder() + .namespace(namespace) + .name(formattedName) + .labelNames(dimensions) + .help(metric.help) + .register(); + } else if (Metric.Type.timer.equals(type)) { + collector = new Histogram.Builder() + .namespace(namespace) + .name(formattedName) + .labelNames(dimensions) + .buckets(.1, .25, .5, .75, 1, 2.5, 5, 7.5, 10, 30, 60, 120, 300) + .help(metric.help) + .register(); + } else { + log.error("Unrecognized metric type [%s]", type); + } + + if (collector != null) { + registeredMetrics.put(name, new DimensionsAndCollector(dimensions, collector, metric.conversionFactor)); + } + } + + } + + private Map readConfig(String path) + { + try { + InputStream is; + if (Strings.isNullOrEmpty(path)) { + log.info("Using default metric configuration"); + is = this.getClass().getClassLoader().getResourceAsStream("defaultMetrics.json"); + } else { + log.info("Using metric configuration at [%s]", path); + is = new FileInputStream(new File(path)); + } + return mapper.readerFor(new TypeReference>() + { + }).readValue(is); + } + catch (IOException e) { + throw new ISE(e, "Failed to parse metric configuration"); + } + } + + public Map getRegisteredMetrics() + { + return registeredMetrics; + } + + public static class Metric + { + public final SortedSet dimensions; + public final Type type; + public final String help; + public final double conversionFactor; + + @JsonCreator + public Metric( + @JsonProperty("dimensions") SortedSet dimensions, + @JsonProperty("type") Type type, + @JsonProperty("help") String help, + @JsonProperty("conversionFactor") double conversionFactor + ) + { + this.dimensions = dimensions; + this.type = type; + this.help = help; + this.conversionFactor = conversionFactor; + } + + public enum Type + { + count, gauge, timer + } + } +} diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java new file mode 100644 index 00000000000..19d047fd29c --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.prometheus; + + +import com.google.common.collect.ImmutableMap; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Histogram; +import io.prometheus.client.exporter.HTTPServer; +import io.prometheus.client.exporter.PushGateway; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; + +import java.io.IOException; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * + */ +public class PrometheusEmitter implements Emitter +{ + private static final Logger log = new Logger(PrometheusEmitter.class); + private final Metrics metrics; + private final PrometheusEmitterConfig config; + private final PrometheusEmitterConfig.Strategy strategy; + private static final Pattern PATTERN = Pattern.compile("[^a-zA-Z0-9_][^a-zA-Z0-9_]*"); + + private HTTPServer server; + private PushGateway pushGateway; + private String identifier; + + static PrometheusEmitter of(PrometheusEmitterConfig config) + { + return new PrometheusEmitter(config); + } + + public PrometheusEmitter(PrometheusEmitterConfig config) + { + this.config = config; + this.strategy = config.getStrategy(); + metrics = new Metrics(config.getNamespace(), config.getDimensionMapPath()); + } + + + @Override + public void start() + { + if (strategy.equals(PrometheusEmitterConfig.Strategy.exporter)) { + if (server == null) { + try { + server = new HTTPServer(config.getPort()); + } + catch (IOException e) { + log.error(e, "Unable to start prometheus HTTPServer"); + } + } else { + log.error("HTTPServer is already started"); + } + } else if (strategy.equals(PrometheusEmitterConfig.Strategy.pushgateway)) { + pushGateway = new PushGateway(config.getPushGatewayAddress()); + } + + } + + @Override + public void emit(Event event) + { + if (event instanceof ServiceMetricEvent) { + emitMetric((ServiceMetricEvent) event); + } + } + + private void emitMetric(ServiceMetricEvent metricEvent) + { + String name = metricEvent.getMetric(); + String service = metricEvent.getService(); + Map userDims = metricEvent.getUserDims(); + identifier = (userDims.get("task") == null ? metricEvent.getHost() : (String) userDims.get("task")); + Number value = metricEvent.getValue(); + + DimensionsAndCollector metric = metrics.getByName(name, service); + if (metric != null) { + String[] labelValues = new String[metric.getDimensions().length]; + String[] labelNames = metric.getDimensions(); + for (int i = 0; i < labelValues.length; i++) { + String labelName = labelNames[i]; + //labelName is controlled by the user. Instead of potential NPE on invalid labelName we use "unknown" as the dimension value + Object userDim = userDims.get(labelName); + labelValues[i] = userDim != null ? PATTERN.matcher(userDim.toString()).replaceAll("_") : "unknown"; + } + + if (metric.getCollector() instanceof Counter) { + ((Counter) metric.getCollector()).labels(labelValues).inc(value.doubleValue()); + } else if (metric.getCollector() instanceof Gauge) { + ((Gauge) metric.getCollector()).labels(labelValues).set(value.doubleValue()); + } else if (metric.getCollector() instanceof Histogram) { + ((Histogram) metric.getCollector()).labels(labelValues).observe(value.doubleValue() / metric.getConversionFactor()); + } else { + log.error("Unrecognized metric type [%s]", metric.getCollector().getClass()); + } + } else { + log.debug("Unmapped metric [%s]", name); + } + } + + private void pushMetric() + { + Map map = metrics.getRegisteredMetrics(); + try { + for (DimensionsAndCollector collector : map.values()) { + if (config.getNamespace() != null) { + pushGateway.push(collector.getCollector(), config.getNamespace(), ImmutableMap.of(config.getNamespace(), identifier)); + } + } + } + catch (IOException e) { + log.error(e, "Unable to push prometheus metrics to pushGateway"); + } + } + + @Override + public void flush() + { + if (pushGateway != null) { + pushMetric(); + } + } + + @Override + public void close() + { + if (strategy.equals(PrometheusEmitterConfig.Strategy.exporter)) { + if (server != null) { + server.stop(); + } + } else { + flush(); + } + } + + public HTTPServer getServer() + { + return server; + } + + public PushGateway getPushGateway() + { + return pushGateway; + } + + public void setPushGateway(PushGateway pushGateway) + { + this.pushGateway = pushGateway; + } +} diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java new file mode 100644 index 00000000000..8a6e7a715a7 --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.prometheus; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; +import java.util.regex.Pattern; + +/** + * + */ +public class PrometheusEmitterConfig +{ + + static final Pattern PATTERN = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*"); + + @JsonProperty + private final Strategy strategy; + + @JsonProperty + @Nullable + private final String namespace; + + @JsonProperty + @Nullable + private final String dimensionMapPath; + + @JsonProperty + @Nullable + private final Integer port; + + @JsonProperty + @Nullable + private final String pushGatewayAddress; + + @JsonCreator + public PrometheusEmitterConfig( + @JsonProperty("strategy") @Nullable Strategy strategy, + @JsonProperty("namespace") @Nullable String namespace, + @JsonProperty("dimensionMapPath") @Nullable String dimensionMapPath, + @JsonProperty("port") @Nullable Integer port, + @JsonProperty("pushGatewayAddress") @Nullable String pushGatewayAddress + ) + { + + this.strategy = strategy != null ? strategy : Strategy.exporter; + this.namespace = namespace != null ? namespace : "druid"; + Preconditions.checkArgument(PATTERN.matcher(this.namespace).matches(), "Invalid namespace " + this.namespace); + this.dimensionMapPath = dimensionMapPath; + this.port = port; + this.pushGatewayAddress = pushGatewayAddress; + } + + public String getNamespace() + { + return namespace; + } + + public String getDimensionMapPath() + { + return dimensionMapPath; + } + + public int getPort() + { + return port; + } + + public String getPushGatewayAddress() + { + return pushGatewayAddress; + } + + public Strategy getStrategy() + { + return strategy; + } + + public enum Strategy + { + exporter, pushgateway + } +} diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterModule.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterModule.java new file mode 100644 index 00000000000..eb74aa7022b --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterModule.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.prometheus; + + +import com.fasterxml.jackson.databind.Module; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.emitter.core.Emitter; + +import java.util.Collections; +import java.util.List; + + +/** + * + */ +public class PrometheusEmitterModule implements DruidModule +{ + private static final String EMITTER_TYPE = "prometheus"; + + @Override + public List getJacksonModules() + { + return Collections.emptyList(); + } + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, PrometheusEmitterConfig.class); + } + + @Provides + @ManageLifecycle + @Named(EMITTER_TYPE) + public Emitter getEmitter(PrometheusEmitterConfig config) + { + return PrometheusEmitter.of(config); + } +} diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/prometheus-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 00000000000..da92fc068bf --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.druid.emitter.prometheus.PrometheusEmitterModule diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json new file mode 100644 index 00000000000..c89d41e68ea --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json @@ -0,0 +1,128 @@ +{ + "query/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete a query."}, + "query/bytes" : { "dimensions" : ["dataSource", "type"], "type" : "count", "help": "Number of bytes returned in query response."}, + "query/node/time" : { "dimensions" : ["server"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual historical/realtime processes."}, + "query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer", "help": "Time to first byte. Seconds elapsed until Broker starts receiving the response from individual historical/realtime processes."}, + "query/node/bytes" : { "dimensions" : ["server"], "type" : "count", "help": "Number of bytes returned from querying individual historical/realtime processes."}, + "query/node/backpressure": { "dimensions" : ["server"], "type" : "timer", "help": "Seconds that the channel to this process has spent suspended due to backpressure."}, + "query/intervalChunk/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk. This metric is deprecated and will be removed in the future because interval chunking is deprecated."}, + + "query/segment/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment. Includes time to page in the segment from disk."}, + "query/wait/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent waiting for a segment to be scanned."}, + "segment/scan/pending" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments in queue waiting to be scanned."}, + "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment or hit the cache (if it is enabled on the Historical process)."}, + "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"}, + + "query/count" : { "dimensions" : [], "type" : "count", "help": "Number of total queries" }, + "query/success/count" : { "dimensions" : [], "type" : "count", "help": "Number of queries successfully processed"}, + "query/failed/count" : { "dimensions" : [], "type" : "count", "help": "Number of failed queries"}, + "query/interrupted/count" : { "dimensions" : [], "type" : "count", "help": "Number of queries interrupted due to cancellation or timeout"}, + + "query/cache/delta/numEntries" : { "dimensions" : [], "type" : "gauge", "help": "Number of entries in cache"}, + "query/cache/delta/sizeBytes" : { "dimensions" : [], "type" : "gauge", "help": "Size of cache in bytes."}, + "query/cache/delta/hits" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache hits."}, + "query/cache/delta/misses" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache misses."}, + "query/cache/delta/evictions" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache evictions."}, + "query/cache/delta/hitRate" : { "dimensions" : [], "type" : "gauge","help": "Cache hit rate."}, + "query/cache/delta/averageBytes" : { "dimensions" : [], "type" : "gauge", "help": "Average size of record in bytes"}, + "query/cache/delta/timeouts" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache timeouts"}, + "query/cache/delta/errors" : { "dimensions" : [], "type" : "gauge", "help": "Number of cache errors."}, + + "query/cache/total/numEntries" : { "dimensions" : [], "type" : "gauge","help": "Total number of entries in cache" }, + "query/cache/total/sizeBytes" : { "dimensions" : [], "type" : "gauge", "help": "Total size of cache in bytes."}, + "query/cache/total/hits" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache hits."}, + "query/cache/total/misses" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache misses." }, + "query/cache/total/evictions" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache evictions."}, + "query/cache/total/hitRate" : { "dimensions" : [], "type" : "gauge", "help": "Total cache hit rate"}, + "query/cache/total/averageBytes" : { "dimensions" : [], "type" : "gauge", "help": "Total average record size in bytes"}, + "query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache timeouts"}, + "query/cache/total/errors" : { "dimensions" : [], "type" : "gauge", "help": "Total number of cache errors" }, + + "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because they are outside the windowPeriod."}, + "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are unparseable." }, + "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are duplicated."}, + "ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events successfully processed per emission period." }, + "ingest/events/messageGap" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Time gap between the data time in event and current system time."}, + "ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of Druid rows persisted."}, + "ingest/persists/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of times persist occurred." }, + "ingest/persists/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent doing intermediate persist."}, + "ingest/persists/cpu" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000000000.0, "help": "Cpu time in Seconds spent on doing intermediate persist." }, + "ingest/persists/backPressure" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Seconds spent creating persist tasks and blocking waiting for them to finish." }, + "ingest/persists/failed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of persists that failed." }, + "ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of handoffs that failed." }, + "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent merging intermediate segments" }, + "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000000000.0, "help": "Cpu time in Seconds spent on merging intermediate segments."}, + + "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."}, + "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."}, + "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."}, + + "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included."}, + "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included."}, + "task/running/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included."}, + "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included."}, + "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included."}, + "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer", "conversionFactor": 1000.0, "help": "Milliseconds taken to run a task."}, + + "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of new segments created." }, + "segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of segments moved/archived via the Move Task." }, + "segment/nuked/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of segments deleted via the Kill Task." }, + + "segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments assigned to be loaded in the cluster."}, + "segment/moved/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments moved in the cluster." }, + "segment/dropped/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments dropped due to being overshadowed." }, + "segment/deleted/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments dropped due to rules."}, + "segment/unneeded/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments dropped due to being marked as unused."}, + "segment/unavailable/count" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Number of segments (not including replicas) left to load until segments that should be loaded in the cluster are available for queries."}, + "segment/underReplicated/count" : { "dimensions" : ["dataSource", "tier"], "type" : "gauge", "help": "Number of segments (including replicas) left to load until segments that should be loaded in the cluster are available for queries."}, + "segment/cost/raw" : { "dimensions" : ["tier"], "type" : "count", "help": "Used in cost balancing. The raw cost of hosting segments."}, + "segment/cost/normalization" : { "dimensions" : ["tier"], "type" : "count", "help": "Used in cost balancing. The normalization of hosting segments."}, + "segment/cost/normalized" : { "dimensions" : ["tier"], "type" : "count", "help": "Used in cost balancing. The normalized cost of hosting segments."}, + "segment/loadQueue/size" : { "dimensions" : ["server"], "type" : "gauge", "help": "Size in bytes of segments to load."}, + "segment/loadQueue/failed" : { "dimensions" : ["server"], "type" : "gauge", "help": "Number of segments that failed to load."}, + "segment/loadQueue/count" : { "dimensions" : ["server"], "type" : "gauge", "help": "Number of segments to load."}, + "segment/dropQueue/count" : { "dimensions" : ["server"], "type" : "gauge", "help": "Number of segments to drop."}, + "segment/size" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Size in bytes of available segments."}, + "segment/overShadowed/count" : { "dimensions" : [], "type" : "gauge", "help": "Number of overShadowed segments."}, + + "segment/max" : { "dimensions" : [], "type" : "gauge", "help": "Maximum byte limit available for segments."}, + "segment/used" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "help": "Bytes used for served segments."}, + "segment/usedPercent" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "help": "Percentage of space used by served segments."}, + "segment/pendingDelete" : { "dimensions" : [], "type" : "gauge", "help": "On-disk size in bytes of segments that are waiting to be cleared out"}, + + "jvm/pool/committed" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Committed pool."}, + "jvm/pool/init" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Initial pool."}, + "jvm/pool/max" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Max pool."}, + "jvm/pool/used" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge", "help": "Pool used."}, + "jvm/bufferpool/count" : { "dimensions" : ["bufferpoolName"], "type" : "gauge", "help": "bufferpool count"}, + "jvm/bufferpool/used" : { "dimensions" : ["bufferpoolName"], "type" : "gauge", "help": "bufferpool used"}, + "jvm/bufferpool/capacity" : { "dimensions" : ["bufferpoolName"], "type" : "gauge", "help": "bufferpool capacity"}, + "jvm/mem/init" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Initial memory"}, + "jvm/mem/max" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Max memory"}, + "jvm/mem/used" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Used memory"}, + "jvm/mem/committed" : { "dimensions" : ["memKind"], "type" : "gauge", "help": "Committed memory"}, + "jvm/gc/count" : { "dimensions" : ["gcName"], "type" : "count", "help": "Garbage collection count"}, + "jvm/gc/cpu" : { "dimensions" : ["gcName"], "type" : "count", "help": "Count of CPU time in Nanoseconds spent on garbage collection. Note: `jvm/gc/cpu` represents the total time over multiple GC cycles; divide by `jvm/gc/count` to get the mean GC time per cycle."}, + + "ingest/events/buffered" : { "dimensions" : ["serviceName", "bufferCapacity"], "type" : "gauge", "help": "Number of events queued in the EventReceiverFirehose's buffer"}, + + "sys/swap/free" : { "dimensions" : [], "type" : "gauge", "help": "Free swap"}, + "sys/swap/max" : { "dimensions" : [], "type" : "gauge", "help": "Max swap"}, + "sys/swap/pageIn" : { "dimensions" : [], "type" : "gauge", "help": "Paged in swap"}, + "sys/swap/pageOut" : { "dimensions" : [], "type" : "gauge", "help": "Paged out swap"}, + "sys/disk/write/count" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Writes to disk."}, + "sys/disk/read/count" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Reads from disk."}, + "sys/disk/write/size" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Bytes written to disk. Can we used to determine how much paging is occurring with regards to segments."}, + "sys/disk/read/size" : { "dimensions" : ["fsDevName"], "type" : "count", "help": "Bytes read from disk. Can we used to determine how much paging is occurring with regards to segments."}, + "sys/net/write/size" : { "dimensions" : [], "type" : "count", "help": "Bytes written to the network."}, + "sys/net/read/size" : { "dimensions" : [], "type" : "count", "help": "Bytes read from the network."}, + "sys/fs/used" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge", "help": "Filesystem bytes used."}, + "sys/fs/max" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge", "help": "Filesystesm bytes max."}, + "sys/mem/used" : { "dimensions" : [], "type" : "gauge", "help": "Memory used."}, + "sys/mem/max" : { "dimensions" : [], "type" : "gauge", "help": "Memory max"}, + "sys/storage/used" : { "dimensions" : ["fsDirName"], "type" : "gauge", "help": "Disk space used."}, + "sys/cpu" : { "dimensions" : ["cpuName", "cpuTime"], "type" : "gauge", "help": "CPU used"}, + + "coordinator_segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Number of served segments."}, + "historical_segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "help": "Number of served segments."} +} diff --git a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java new file mode 100644 index 00000000000..968b29e951d --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.prometheus; + +import io.prometheus.client.Histogram; +import org.junit.Assert; +import org.junit.Test; + +public class MetricsTest +{ + @Test + public void testMetricsConfiguration() + { + Metrics metrics = new Metrics("test", null); + DimensionsAndCollector dimensionsAndCollector = metrics.getByName("query/time", "historical"); + Assert.assertNotNull(dimensionsAndCollector); + String[] dimensions = dimensionsAndCollector.getDimensions(); + Assert.assertEquals("dataSource", dimensions[0]); + Assert.assertEquals("type", dimensions[1]); + Assert.assertEquals(1000.0, dimensionsAndCollector.getConversionFactor(), 0.0); + Assert.assertTrue(dimensionsAndCollector.getCollector() instanceof Histogram); + + DimensionsAndCollector d = metrics.getByName("segment/loadQueue/count", "historical"); + Assert.assertNotNull(d); + String[] dims = d.getDimensions(); + Assert.assertEquals("server", dims[0]); + } +} diff --git a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java new file mode 100644 index 00000000000..c9ca139b06d --- /dev/null +++ b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.prometheus; + +import com.google.common.collect.ImmutableMap; +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.mock; + + +public class PrometheusEmitterTest +{ + @Test + public void testEmitter() + { + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null); + PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); + Emitter emitter = prometheusEmitterModule.getEmitter(config); + ServiceMetricEvent build = ServiceMetricEvent.builder() + .setDimension("server", "druid-data01.vpc.region") + .build("segment/loadQueue/count", 10) + .build(ImmutableMap.of("service", "historical")); + Assert.assertEquals("historical", build.getService()); + Assert.assertFalse(build.getUserDims().isEmpty()); + emitter.emit(build); + Double count = CollectorRegistry.defaultRegistry.getSampleValue( + "druid_segment_loadqueue_count", new String[]{"server"}, new String[]{"druid_data01_vpc_region"} + ); + Assert.assertEquals(10, count.intValue()); + } + + @Test + public void testEmitterMetric() + { + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace", null, 0, "pushgateway"); + PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); + Emitter emitter = prometheusEmitterModule.getEmitter(config); + ServiceMetricEvent build = ServiceMetricEvent.builder() + .setDimension("dataSource", "test") + .setDimension("taskType", "index_parallel") + .build("task/run/time", 500) + .build(ImmutableMap.of("service", "overlord")); + emitter.emit(build); + double assertEpsilon = 0.0001; + Assert.assertEquals(0.0, CollectorRegistry.defaultRegistry.getSampleValue( + "namespace_task_run_time_bucket", new String[]{"dataSource", "taskType", "le"}, new String[]{"test", "index_parallel", "0.1"} + ), assertEpsilon); + Assert.assertEquals(1.0, CollectorRegistry.defaultRegistry.getSampleValue( + "namespace_task_run_time_bucket", new String[]{"dataSource", "taskType", "le"}, new String[]{"test", "index_parallel", "0.5"} + ), assertEpsilon); + } + + @Test + public void testEmitterStart() + { + PrometheusEmitterConfig exportEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "namespace1", null, 0, null); + PrometheusEmitter exportEmitter = new PrometheusEmitter(exportEmitterConfig); + exportEmitter.start(); + Assert.assertNotNull(exportEmitter.getServer()); + + PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace2", null, 0, "pushgateway"); + PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig); + pushEmitter.start(); + Assert.assertNotNull(pushEmitter.getPushGateway()); + } + + @Test + public void testEmitterPush() throws IOException + { + PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway"); + + PushGateway mockPushGateway = mock(PushGateway.class); + mockPushGateway.push(anyObject(Collector.class), anyString(), anyObject(ImmutableMap.class)); + + PrometheusEmitter emitter = new PrometheusEmitter(emitterConfig); + emitter.start(); + emitter.setPushGateway(mockPushGateway); + ServiceMetricEvent build = ServiceMetricEvent.builder() + .setDimension("task", "index_parallel") + .build("task/run/time", 500) + .build(ImmutableMap.of("service", "peon")); + emitter.emit(build); + emitter.flush(); + } +} diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 1b17a4c1b6d..298b440e520 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -110,7 +110,7 @@ "jvm/gc/count" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" }, "jvm/gc/cpu" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" }, - "ingest/events/buffered" : { "dimensions" : ["serviceName, bufferCapacity"], "type" : "gauge"}, + "ingest/events/buffered" : { "dimensions" : ["serviceName", "bufferCapacity"], "type" : "gauge"}, "sys/swap/free" : { "dimensions" : [], "type" : "gauge"}, "sys/swap/max" : { "dimensions" : [], "type" : "gauge"}, diff --git a/pom.xml b/pom.xml index 10ddf028e7f..c2ff9fc4067 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,7 @@ extensions-contrib/influxdb-emitter extensions-contrib/gce-extensions extensions-contrib/aliyun-oss-extensions + extensions-contrib/prometheus-emitter distribution diff --git a/website/.spelling b/website/.spelling index 9160ab2e091..50a22a561ff 100644 --- a/website/.spelling +++ b/website/.spelling @@ -583,6 +583,11 @@ com.microsoft.sqlserver.jdbc.SQLServerDriver sqljdbc - ../docs/development/extensions-contrib/statsd.md convertRange +- ../docs/development/extensions-contrib/prometheus.md +HTTPServer +conversionFactor +prometheus +Pushgateway - ../docs/development/extensions-contrib/tdigestsketch-quantiles.md postAggregator quantileFromTDigestSketch