diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java index 10df76941ca..db2ae857adb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.Writer; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; @@ -42,7 +43,10 @@ public class PrometheusMetricsSink implements MetricsSink { /** * Cached output lines for each metrics. */ - private final Map metricLines = new ConcurrentHashMap<>(); + private Map, AbstractMetric>> promMetrics = + new ConcurrentHashMap<>(); + private Map, AbstractMetric>> nextPromMetrics = + new ConcurrentHashMap<>(); private static final Pattern SPLIT_PATTERN = Pattern.compile("(? new ConcurrentHashMap<>()) + .put(metricsRecord.tags(), metric); } } } @@ -108,17 +86,55 @@ public String prometheusName(String recordName, @Override public void flush() { - + promMetrics = nextPromMetrics; + nextPromMetrics = new ConcurrentHashMap<>(); } @Override - public void init(SubsetConfiguration subsetConfiguration) { - + public void init(SubsetConfiguration conf) { } public void writeMetrics(Writer writer) throws IOException { - for (String line : metricLines.values()) { - writer.write(line); + for (Map.Entry, AbstractMetric>> promMetric : + promMetrics.entrySet()) { + AbstractMetric firstMetric = promMetric.getValue().values().iterator().next(); + + StringBuilder builder = new StringBuilder(); + builder.append("# HELP ") + .append(promMetric.getKey()) + .append(" ") + .append(firstMetric.description()) + .append("\n") + .append("# TYPE ") + .append(promMetric.getKey()) + .append(" ") + .append(firstMetric.type().toString().toLowerCase()) + .append("\n"); + + for (Map.Entry, AbstractMetric> metric : + promMetric.getValue().entrySet()) { + builder.append(promMetric.getKey()) + .append("{"); + + String sep = ""; + for (MetricsTag tag : metric.getKey()) { + String tagName = tag.name().toLowerCase(); + + if (!tagName.equals("numopenconnectionsperuser")) { + builder.append(sep) + .append(tagName) + .append("=\"") + .append(tag.value()) + .append("\""); + sep = ","; + } + } + builder.append("} "); + builder.append(metric.getValue().value()); + builder.append("\n"); + } + + writer.write(builder.toString()); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java index 3fc4aa4cc34..df131913ff4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java @@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.annotation.Metric.Type; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableCounterLong; @@ -48,7 +49,6 @@ public void testPublish() throws IOException { TestMetrics testMetrics = metrics .register("TestMetrics", "Testing metrics", new TestMetrics()); - metrics.start(); testMetrics.numBucketCreateFails.incr(); metrics.publishMetricsNow(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); @@ -67,6 +67,104 @@ public void testPublish() throws IOException { "test_metrics_num_bucket_create_fails{context=\"dfs\"") ); + metrics.unregisterSource("TestMetrics"); + metrics.stop(); + metrics.shutdown(); + } + + /** + * Fix for HADOOP-17804, make sure Prometheus metrics get deduped based on metric + * and tags, not just the metric. + */ + @Test + public void testPublishMultiple() throws IOException { + //GIVEN + MetricsSystem metrics = DefaultMetricsSystem.instance(); + + metrics.init("test"); + PrometheusMetricsSink sink = new PrometheusMetricsSink(); + metrics.register("Prometheus", "Prometheus", sink); + TestMetrics testMetrics1 = metrics + .register("TestMetrics1", "Testing metrics", new TestMetrics("1")); + TestMetrics testMetrics2 = metrics + .register("TestMetrics2", "Testing metrics", new TestMetrics("2")); + + testMetrics1.numBucketCreateFails.incr(); + testMetrics2.numBucketCreateFails.incr(); + metrics.publishMetricsNow(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8); + + //WHEN + sink.writeMetrics(writer); + writer.flush(); + + //THEN + String writtenMetrics = stream.toString(UTF_8.name()); + System.out.println(writtenMetrics); + Assert.assertTrue( + "The expected first metric line is missing from prometheus metrics output", + writtenMetrics.contains( + "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"") + ); + Assert.assertTrue( + "The expected second metric line is missing from prometheus metrics output", + writtenMetrics.contains( + "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"") + ); + + metrics.unregisterSource("TestMetrics1"); + metrics.unregisterSource("TestMetrics2"); + metrics.stop(); + metrics.shutdown(); + } + + /** + * Fix for HADOOP-17804, make sure Prometheus metrics start fresh after each flush. + */ + @Test + public void testPublishFlush() throws IOException { + //GIVEN + MetricsSystem metrics = DefaultMetricsSystem.instance(); + + metrics.init("test"); + PrometheusMetricsSink sink = new PrometheusMetricsSink(); + metrics.register("Prometheus", "Prometheus", sink); + TestMetrics testMetrics = metrics + .register("TestMetrics", "Testing metrics", new TestMetrics("1")); + + testMetrics.numBucketCreateFails.incr(); + metrics.publishMetricsNow(); + + metrics.unregisterSource("TestMetrics"); + testMetrics = metrics + .register("TestMetrics", "Testing metrics", new TestMetrics("2")); + + testMetrics.numBucketCreateFails.incr(); + metrics.publishMetricsNow(); + + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8); + + //WHEN + sink.writeMetrics(writer); + writer.flush(); + + //THEN + String writtenMetrics = stream.toString(UTF_8.name()); + System.out.println(writtenMetrics); + Assert.assertFalse( + "The first metric should not exist after flushing", + writtenMetrics.contains( + "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"") + ); + Assert.assertTrue( + "The expected metric line is missing from prometheus metrics output", + writtenMetrics.contains( + "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"") + ); + + metrics.unregisterSource("TestMetrics"); metrics.stop(); metrics.shutdown(); } @@ -126,6 +224,20 @@ public void testNamingWhitespaces() { */ @Metrics(about = "Test Metrics", context = "dfs") private static class TestMetrics { + private String id; + + TestMetrics() { + this("1"); + } + + TestMetrics(String id) { + this.id = id; + } + + @Metric(value={"testTag", ""}, type=Type.TAG) + String testTag1() { + return "testTagValue" + id; + } @Metric private MutableCounterLong numBucketCreateFails;