HADOOP-17804. Expose prometheus metrics only after a flush and dedupe with tag values (#3369)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
e708836641
commit
4ced012f33
|
@ -26,6 +26,7 @@ import org.apache.hadoop.metrics2.MetricsTag;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -42,7 +43,10 @@ public class PrometheusMetricsSink implements MetricsSink {
|
||||||
/**
|
/**
|
||||||
* Cached output lines for each metrics.
|
* Cached output lines for each metrics.
|
||||||
*/
|
*/
|
||||||
private final Map<String, String> metricLines = new ConcurrentHashMap<>();
|
private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetrics =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> nextPromMetrics =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static final Pattern SPLIT_PATTERN =
|
private static final Pattern SPLIT_PATTERN =
|
||||||
Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
|
Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
|
||||||
|
@ -53,42 +57,16 @@ public class PrometheusMetricsSink implements MetricsSink {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void putMetrics(MetricsRecord metricsRecord) {
|
public void putMetrics(MetricsRecord metricsRecord) {
|
||||||
for (AbstractMetric metrics : metricsRecord.metrics()) {
|
for (AbstractMetric metric : metricsRecord.metrics()) {
|
||||||
if (metrics.type() == MetricType.COUNTER
|
if (metric.type() == MetricType.COUNTER
|
||||||
|| metrics.type() == MetricType.GAUGE) {
|
|| metric.type() == MetricType.GAUGE) {
|
||||||
|
|
||||||
String key = prometheusName(
|
String key = prometheusName(
|
||||||
metricsRecord.name(), metrics.name());
|
metricsRecord.name(), metric.name());
|
||||||
|
|
||||||
StringBuilder builder = new StringBuilder();
|
|
||||||
builder.append("# TYPE ")
|
|
||||||
.append(key)
|
|
||||||
.append(" ")
|
|
||||||
.append(metrics.type().toString().toLowerCase())
|
|
||||||
.append("\n")
|
|
||||||
.append(key)
|
|
||||||
.append("{");
|
|
||||||
String sep = "";
|
|
||||||
|
|
||||||
//add tags
|
|
||||||
for (MetricsTag tag : metricsRecord.tags()) {
|
|
||||||
String tagName = tag.name().toLowerCase();
|
|
||||||
|
|
||||||
//ignore specific tag which includes sub-hierarchy
|
|
||||||
if (!tagName.equals("numopenconnectionsperuser")) {
|
|
||||||
builder.append(sep)
|
|
||||||
.append(tagName)
|
|
||||||
.append("=\"")
|
|
||||||
.append(tag.value())
|
|
||||||
.append("\"");
|
|
||||||
sep = ",";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
builder.append("} ");
|
|
||||||
builder.append(metrics.value());
|
|
||||||
builder.append("\n");
|
|
||||||
metricLines.put(key, builder.toString());
|
|
||||||
|
|
||||||
|
nextPromMetrics.computeIfAbsent(key,
|
||||||
|
any -> new ConcurrentHashMap<>())
|
||||||
|
.put(metricsRecord.tags(), metric);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -108,17 +86,55 @@ public class PrometheusMetricsSink implements MetricsSink {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush() {
|
public void flush() {
|
||||||
|
promMetrics = nextPromMetrics;
|
||||||
|
nextPromMetrics = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(SubsetConfiguration subsetConfiguration) {
|
public void init(SubsetConfiguration conf) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeMetrics(Writer writer) throws IOException {
|
public void writeMetrics(Writer writer) throws IOException {
|
||||||
for (String line : metricLines.values()) {
|
for (Map.Entry<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetric :
|
||||||
writer.write(line);
|
promMetrics.entrySet()) {
|
||||||
|
AbstractMetric firstMetric = promMetric.getValue().values().iterator().next();
|
||||||
|
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
builder.append("# HELP ")
|
||||||
|
.append(promMetric.getKey())
|
||||||
|
.append(" ")
|
||||||
|
.append(firstMetric.description())
|
||||||
|
.append("\n")
|
||||||
|
.append("# TYPE ")
|
||||||
|
.append(promMetric.getKey())
|
||||||
|
.append(" ")
|
||||||
|
.append(firstMetric.type().toString().toLowerCase())
|
||||||
|
.append("\n");
|
||||||
|
|
||||||
|
for (Map.Entry<Collection<MetricsTag>, AbstractMetric> metric :
|
||||||
|
promMetric.getValue().entrySet()) {
|
||||||
|
builder.append(promMetric.getKey())
|
||||||
|
.append("{");
|
||||||
|
|
||||||
|
String sep = "";
|
||||||
|
for (MetricsTag tag : metric.getKey()) {
|
||||||
|
String tagName = tag.name().toLowerCase();
|
||||||
|
|
||||||
|
if (!tagName.equals("numopenconnectionsperuser")) {
|
||||||
|
builder.append(sep)
|
||||||
|
.append(tagName)
|
||||||
|
.append("=\"")
|
||||||
|
.append(tag.value())
|
||||||
|
.append("\"");
|
||||||
|
sep = ",";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder.append("} ");
|
||||||
|
builder.append(metric.getValue().value());
|
||||||
|
builder.append("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.write(builder.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.OutputStreamWriter;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric.Type;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
|
||||||
|
@ -48,7 +49,6 @@ public class TestPrometheusMetricsSink {
|
||||||
TestMetrics testMetrics = metrics
|
TestMetrics testMetrics = metrics
|
||||||
.register("TestMetrics", "Testing metrics", new TestMetrics());
|
.register("TestMetrics", "Testing metrics", new TestMetrics());
|
||||||
|
|
||||||
metrics.start();
|
|
||||||
testMetrics.numBucketCreateFails.incr();
|
testMetrics.numBucketCreateFails.incr();
|
||||||
metrics.publishMetricsNow();
|
metrics.publishMetricsNow();
|
||||||
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
||||||
|
@ -67,6 +67,104 @@ public class TestPrometheusMetricsSink {
|
||||||
"test_metrics_num_bucket_create_fails{context=\"dfs\"")
|
"test_metrics_num_bucket_create_fails{context=\"dfs\"")
|
||||||
);
|
);
|
||||||
|
|
||||||
|
metrics.unregisterSource("TestMetrics");
|
||||||
|
metrics.stop();
|
||||||
|
metrics.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fix for HADOOP-17804, make sure Prometheus metrics get deduped based on metric
|
||||||
|
* and tags, not just the metric.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPublishMultiple() throws IOException {
|
||||||
|
//GIVEN
|
||||||
|
MetricsSystem metrics = DefaultMetricsSystem.instance();
|
||||||
|
|
||||||
|
metrics.init("test");
|
||||||
|
PrometheusMetricsSink sink = new PrometheusMetricsSink();
|
||||||
|
metrics.register("Prometheus", "Prometheus", sink);
|
||||||
|
TestMetrics testMetrics1 = metrics
|
||||||
|
.register("TestMetrics1", "Testing metrics", new TestMetrics("1"));
|
||||||
|
TestMetrics testMetrics2 = metrics
|
||||||
|
.register("TestMetrics2", "Testing metrics", new TestMetrics("2"));
|
||||||
|
|
||||||
|
testMetrics1.numBucketCreateFails.incr();
|
||||||
|
testMetrics2.numBucketCreateFails.incr();
|
||||||
|
metrics.publishMetricsNow();
|
||||||
|
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
||||||
|
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
|
||||||
|
|
||||||
|
//WHEN
|
||||||
|
sink.writeMetrics(writer);
|
||||||
|
writer.flush();
|
||||||
|
|
||||||
|
//THEN
|
||||||
|
String writtenMetrics = stream.toString(UTF_8.name());
|
||||||
|
System.out.println(writtenMetrics);
|
||||||
|
Assert.assertTrue(
|
||||||
|
"The expected first metric line is missing from prometheus metrics output",
|
||||||
|
writtenMetrics.contains(
|
||||||
|
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
|
||||||
|
);
|
||||||
|
Assert.assertTrue(
|
||||||
|
"The expected second metric line is missing from prometheus metrics output",
|
||||||
|
writtenMetrics.contains(
|
||||||
|
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
|
||||||
|
);
|
||||||
|
|
||||||
|
metrics.unregisterSource("TestMetrics1");
|
||||||
|
metrics.unregisterSource("TestMetrics2");
|
||||||
|
metrics.stop();
|
||||||
|
metrics.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fix for HADOOP-17804, make sure Prometheus metrics start fresh after each flush.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPublishFlush() throws IOException {
|
||||||
|
//GIVEN
|
||||||
|
MetricsSystem metrics = DefaultMetricsSystem.instance();
|
||||||
|
|
||||||
|
metrics.init("test");
|
||||||
|
PrometheusMetricsSink sink = new PrometheusMetricsSink();
|
||||||
|
metrics.register("Prometheus", "Prometheus", sink);
|
||||||
|
TestMetrics testMetrics = metrics
|
||||||
|
.register("TestMetrics", "Testing metrics", new TestMetrics("1"));
|
||||||
|
|
||||||
|
testMetrics.numBucketCreateFails.incr();
|
||||||
|
metrics.publishMetricsNow();
|
||||||
|
|
||||||
|
metrics.unregisterSource("TestMetrics");
|
||||||
|
testMetrics = metrics
|
||||||
|
.register("TestMetrics", "Testing metrics", new TestMetrics("2"));
|
||||||
|
|
||||||
|
testMetrics.numBucketCreateFails.incr();
|
||||||
|
metrics.publishMetricsNow();
|
||||||
|
|
||||||
|
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
||||||
|
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
|
||||||
|
|
||||||
|
//WHEN
|
||||||
|
sink.writeMetrics(writer);
|
||||||
|
writer.flush();
|
||||||
|
|
||||||
|
//THEN
|
||||||
|
String writtenMetrics = stream.toString(UTF_8.name());
|
||||||
|
System.out.println(writtenMetrics);
|
||||||
|
Assert.assertFalse(
|
||||||
|
"The first metric should not exist after flushing",
|
||||||
|
writtenMetrics.contains(
|
||||||
|
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
|
||||||
|
);
|
||||||
|
Assert.assertTrue(
|
||||||
|
"The expected metric line is missing from prometheus metrics output",
|
||||||
|
writtenMetrics.contains(
|
||||||
|
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
|
||||||
|
);
|
||||||
|
|
||||||
|
metrics.unregisterSource("TestMetrics");
|
||||||
metrics.stop();
|
metrics.stop();
|
||||||
metrics.shutdown();
|
metrics.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -126,6 +224,20 @@ public class TestPrometheusMetricsSink {
|
||||||
*/
|
*/
|
||||||
@Metrics(about = "Test Metrics", context = "dfs")
|
@Metrics(about = "Test Metrics", context = "dfs")
|
||||||
private static class TestMetrics {
|
private static class TestMetrics {
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
TestMetrics() {
|
||||||
|
this("1");
|
||||||
|
}
|
||||||
|
|
||||||
|
TestMetrics(String id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Metric(value={"testTag", ""}, type=Type.TAG)
|
||||||
|
String testTag1() {
|
||||||
|
return "testTagValue" + id;
|
||||||
|
}
|
||||||
|
|
||||||
@Metric
|
@Metric
|
||||||
private MutableCounterLong numBucketCreateFails;
|
private MutableCounterLong numBucketCreateFails;
|
||||||
|
|
Loading…
Reference in New Issue