prometheus-emitter supports sending metrics to pushgateway regularly … (#13034)

* prometheus-emitter supports sending metrics to pushgateway regularly and continuously

* spell check fix

* Optimization variable name and related documents

* Update docs/development/extensions-contrib/prometheus.md

OK, it looks more conspicuous

Co-authored-by: Frank Chen <frankchen@apache.org>

* Update doc

* Update docs/development/extensions-contrib/prometheus.md

Co-authored-by: Frank Chen <frankchen@apache.org>

* When PrometheusEmitter is closed, close the scheduler

* Ensure that registeredMetrics is thread safe.

* Local variable name optimization

* Remove unnecessary white space characters

Co-authored-by: Frank Chen <frankchen@apache.org>
This commit is contained in:
DENNIS 2022-09-09 20:46:14 +08:00 committed by GitHub
parent 48c99054d0
commit dced61645f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 54 additions and 16 deletions

View File

@ -44,6 +44,7 @@ All the configuration parameters for the Prometheus emitter are under `druid.emi
| `druid.emitter.prometheus.addHostAsLabel` | Flag to include the hostname as a prometheus label. | no | false | | `druid.emitter.prometheus.addHostAsLabel` | Flag to include the hostname as a prometheus label. | no | false |
| `druid.emitter.prometheus.addServiceAsLabel` | Flag to include the druid service name (e.g. `druid/broker`, `druid/coordinator`, etc.) as a prometheus label. | no | false | | `druid.emitter.prometheus.addServiceAsLabel` | Flag to include the druid service name (e.g. `druid/broker`, `druid/coordinator`, etc.) as a prometheus label. | no | false |
| `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address. Required if using `pushgateway` strategy. | no | none | | `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address. Required if using `pushgateway` strategy. | no | none |
|`druid.emitter.prometheus.flushPeriod`|Emit metrics to Pushgateway every `flushPeriod` seconds. Required if `pushgateway` strategy is used.|no|15|
### Override properties for Peon Tasks ### Override properties for Peon Tasks

View File

@ -36,6 +36,7 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.SortedSet; import java.util.SortedSet;
@ -45,7 +46,7 @@ public class Metrics
{ {
private static final Logger log = new Logger(Metrics.class); private static final Logger log = new Logger(Metrics.class);
private final Map<String, DimensionsAndCollector> registeredMetrics = new HashMap<>(); private final Map<String, DimensionsAndCollector> registeredMetrics;
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
public static final Pattern PATTERN = Pattern.compile("[^a-zA-Z_:][^a-zA-Z0-9_:]*"); public static final Pattern PATTERN = Pattern.compile("[^a-zA-Z_:][^a-zA-Z0-9_:]*");
@ -63,6 +64,7 @@ public class Metrics
public Metrics(String namespace, String path, boolean isAddHostAsLabel, boolean isAddServiceAsLabel) public Metrics(String namespace, String path, boolean isAddHostAsLabel, boolean isAddServiceAsLabel)
{ {
Map<String, DimensionsAndCollector> registeredMetrics = new HashMap<>();
Map<String, Metric> metrics = readConfig(path); Map<String, Metric> metrics = readConfig(path);
for (Map.Entry<String, Metric> entry : metrics.entrySet()) { for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
String name = entry.getKey(); String name = entry.getKey();
@ -110,7 +112,7 @@ public class Metrics
registeredMetrics.put(name, new DimensionsAndCollector(dimensions, collector, metric.conversionFactor)); registeredMetrics.put(name, new DimensionsAndCollector(dimensions, collector, metric.conversionFactor));
} }
} }
this.registeredMetrics = Collections.unmodifiableMap(registeredMetrics);
} }
private Map<String, Metric> readConfig(String path) private Map<String, Metric> readConfig(String path)

View File

@ -27,6 +27,7 @@ import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram; import io.prometheus.client.Histogram;
import io.prometheus.client.exporter.HTTPServer; import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.exporter.PushGateway; import io.prometheus.client.exporter.PushGateway;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.core.Event;
@ -36,6 +37,8 @@ import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
@ -55,6 +58,7 @@ public class PrometheusEmitter implements Emitter
private HTTPServer server; private HTTPServer server;
private PushGateway pushGateway; private PushGateway pushGateway;
private String identifier; private String identifier;
private ScheduledExecutorService exec;
static PrometheusEmitter of(PrometheusEmitterConfig config) static PrometheusEmitter of(PrometheusEmitterConfig config)
{ {
@ -91,6 +95,13 @@ public class PrometheusEmitter implements Emitter
} else { } else {
pushGateway = new PushGateway(address); pushGateway = new PushGateway(address);
} }
exec = ScheduledExecutors.fixed(1, "PrometheusPushGatewayEmitter-%s");
exec.scheduleAtFixedRate(
() -> flush(),
config.getFlushPeriod(),
config.getFlushPeriod(),
TimeUnit.SECONDS
);
} }
} }
@ -190,6 +201,7 @@ public class PrometheusEmitter implements Emitter
server.stop(); server.stop();
} }
} else { } else {
exec.shutdownNow();
flush(); flush();
} }
} }

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Objects;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
@ -53,6 +54,10 @@ public class PrometheusEmitterConfig
@Nullable @Nullable
private final String pushGatewayAddress; private final String pushGatewayAddress;
@JsonProperty
@Nullable
private final Integer flushPeriod;
@JsonProperty @JsonProperty
private final boolean addHostAsLabel; private final boolean addHostAsLabel;
@ -67,7 +72,8 @@ public class PrometheusEmitterConfig
@JsonProperty("port") @Nullable Integer port, @JsonProperty("port") @Nullable Integer port,
@JsonProperty("pushGatewayAddress") @Nullable String pushGatewayAddress, @JsonProperty("pushGatewayAddress") @Nullable String pushGatewayAddress,
@JsonProperty("addHostAsLabel") boolean addHostAsLabel, @JsonProperty("addHostAsLabel") boolean addHostAsLabel,
@JsonProperty("addServiceAsLabel") boolean addServiceAsLabel @JsonProperty("addServiceAsLabel") boolean addServiceAsLabel,
@JsonProperty("flushPeriod") Integer flushPeriod
) )
{ {
this.strategy = strategy != null ? strategy : Strategy.exporter; this.strategy = strategy != null ? strategy : Strategy.exporter;
@ -75,12 +81,18 @@ public class PrometheusEmitterConfig
Preconditions.checkArgument(PATTERN.matcher(this.namespace).matches(), "Invalid namespace " + this.namespace); Preconditions.checkArgument(PATTERN.matcher(this.namespace).matches(), "Invalid namespace " + this.namespace);
if (strategy == Strategy.exporter) { if (strategy == Strategy.exporter) {
Preconditions.checkArgument(port != null, "For `exporter` strategy, port must be specified."); Preconditions.checkArgument(port != null, "For `exporter` strategy, port must be specified.");
} else if (strategy == Strategy.pushgateway) { } else if (this.strategy == Strategy.pushgateway) {
Preconditions.checkArgument(pushGatewayAddress != null, "For `pushgateway` strategy, pushGatewayAddress must be specified."); Preconditions.checkArgument(pushGatewayAddress != null, "For `pushgateway` strategy, pushGatewayAddress must be specified.");
if (Objects.nonNull(flushPeriod)) {
Preconditions.checkArgument(flushPeriod > 0, "flushPeriod must be greater than 0.");
} else {
flushPeriod = 15;
}
} }
this.dimensionMapPath = dimensionMapPath; this.dimensionMapPath = dimensionMapPath;
this.port = port; this.port = port;
this.pushGatewayAddress = pushGatewayAddress; this.pushGatewayAddress = pushGatewayAddress;
this.flushPeriod = flushPeriod;
this.addHostAsLabel = addHostAsLabel; this.addHostAsLabel = addHostAsLabel;
this.addServiceAsLabel = addServiceAsLabel; this.addServiceAsLabel = addServiceAsLabel;
} }
@ -105,6 +117,12 @@ public class PrometheusEmitterConfig
return pushGatewayAddress; return pushGatewayAddress;
} }
@Nullable
public Integer getFlushPeriod()
{
return flushPeriod;
}
public Strategy getStrategy() public Strategy getStrategy()
{ {
return strategy; return strategy;

View File

@ -41,7 +41,7 @@ public class PrometheusEmitterTest
public void testEmitterWithServiceLabel() public void testEmitterWithServiceLabel()
{ {
CollectorRegistry.defaultRegistry.clear(); CollectorRegistry.defaultRegistry.clear();
PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true); PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60);
PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule();
Emitter emitter = prometheusEmitterModule.getEmitter(config); Emitter emitter = prometheusEmitterModule.getEmitter(config);
ServiceMetricEvent build = ServiceMetricEvent.builder() ServiceMetricEvent build = ServiceMetricEvent.builder()
@ -62,7 +62,7 @@ public class PrometheusEmitterTest
public void testEmitterWithServiceAndHostLabel() public void testEmitterWithServiceAndHostLabel()
{ {
CollectorRegistry.defaultRegistry.clear(); CollectorRegistry.defaultRegistry.clear();
PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, true, true); PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, true, true, 60);
PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule();
Emitter emitter = prometheusEmitterModule.getEmitter(config); Emitter emitter = prometheusEmitterModule.getEmitter(config);
ServiceMetricEvent build = ServiceMetricEvent.builder() ServiceMetricEvent build = ServiceMetricEvent.builder()
@ -83,7 +83,7 @@ public class PrometheusEmitterTest
public void testEmitterMetric() public void testEmitterMetric()
{ {
CollectorRegistry.defaultRegistry.clear(); CollectorRegistry.defaultRegistry.clear();
PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace", null, 0, "pushgateway", true, true); PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace", null, 0, "pushgateway", true, true, 60);
PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule();
Emitter emitter = prometheusEmitterModule.getEmitter(config); Emitter emitter = prometheusEmitterModule.getEmitter(config);
ServiceMetricEvent build = ServiceMetricEvent.builder() ServiceMetricEvent build = ServiceMetricEvent.builder()
@ -104,12 +104,12 @@ public class PrometheusEmitterTest
@Test @Test
public void testEmitterStart() public void testEmitterStart()
{ {
PrometheusEmitterConfig exportEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "namespace1", null, 0, null, true, true); PrometheusEmitterConfig exportEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "namespace1", null, 0, null, true, true, 60);
PrometheusEmitter exportEmitter = new PrometheusEmitter(exportEmitterConfig); PrometheusEmitter exportEmitter = new PrometheusEmitter(exportEmitterConfig);
exportEmitter.start(); exportEmitter.start();
Assert.assertNotNull(exportEmitter.getServer()); Assert.assertNotNull(exportEmitter.getServer());
PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace2", null, 0, "pushgateway", true, true); PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace2", null, 0, "pushgateway", true, true, 60);
PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig); PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig);
pushEmitter.start(); pushEmitter.start();
Assert.assertNotNull(pushEmitter.getPushGateway()); Assert.assertNotNull(pushEmitter.getPushGateway());
@ -118,7 +118,7 @@ public class PrometheusEmitterTest
@Test @Test
public void testEmitterPush() throws IOException public void testEmitterPush() throws IOException
{ {
PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway", true, true); PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway", true, true, 60);
PushGateway mockPushGateway = mock(PushGateway.class); PushGateway mockPushGateway = mock(PushGateway.class);
mockPushGateway.push(anyObject(Collector.class), anyString(), anyObject(ImmutableMap.class)); mockPushGateway.push(anyObject(Collector.class), anyString(), anyObject(ImmutableMap.class));
@ -145,7 +145,8 @@ public class PrometheusEmitterTest
1, 1,
null, null,
true, true,
true true,
60
); );
Assert.assertThrows( Assert.assertThrows(
@ -158,7 +159,8 @@ public class PrometheusEmitterTest
null, null,
null, null,
true, true,
true true,
50
) )
); );
} }
@ -166,7 +168,7 @@ public class PrometheusEmitterTest
@Test @Test
public void testEmitterStartWithHttpUrl() public void testEmitterStartWithHttpUrl()
{ {
PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace4", null, 0, "http://pushgateway", true, true); PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace4", null, 0, "http://pushgateway", true, true, 60);
PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig); PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig);
pushEmitter.start(); pushEmitter.start();
Assert.assertNotNull(pushEmitter.getPushGateway()); Assert.assertNotNull(pushEmitter.getPushGateway());
@ -175,7 +177,7 @@ public class PrometheusEmitterTest
@Test @Test
public void testEmitterStartWithHttpsUrl() public void testEmitterStartWithHttpsUrl()
{ {
PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace5", null, 0, "https://pushgateway", true, true); PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace5", null, 0, "https://pushgateway", true, true, 60);
PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig); PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig);
pushEmitter.start(); pushEmitter.start();
Assert.assertNotNull(pushEmitter.getPushGateway()); Assert.assertNotNull(pushEmitter.getPushGateway());
@ -194,7 +196,8 @@ public class PrometheusEmitterTest
null, null,
"https://pushgateway", "https://pushgateway",
true, true,
true true,
60
) )
); );
@ -206,7 +209,8 @@ public class PrometheusEmitterTest
null, null,
"https://pushgateway", "https://pushgateway",
true, true,
true true,
60
); );
} }
} }

View File

@ -862,6 +862,7 @@ HTTPServer
conversionFactor conversionFactor
prometheus prometheus
Pushgateway Pushgateway
flushPeriod
- ../docs/development/extensions-contrib/tdigestsketch-quantiles.md - ../docs/development/extensions-contrib/tdigestsketch-quantiles.md
postAggregator postAggregator
quantileFromTDigestSketch quantileFromTDigestSketch