HADOOP-17893. Improve PrometheusSink for Namenode TopMetrics (#3426)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit ae2c5ccfcf
)
This commit is contained in:
parent
92af6cd3bc
commit
9700d98eac
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.metrics2.sink;
|
package org.apache.hadoop.metrics2.sink;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
import org.apache.commons.configuration2.SubsetConfiguration;
|
import org.apache.commons.configuration2.SubsetConfiguration;
|
||||||
import org.apache.hadoop.metrics2.AbstractMetric;
|
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||||
import org.apache.hadoop.metrics2.MetricType;
|
import org.apache.hadoop.metrics2.MetricType;
|
||||||
|
@ -52,6 +55,13 @@ public class PrometheusMetricsSink implements MetricsSink {
|
||||||
Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
|
Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
|
||||||
private static final Pattern DELIMITERS = Pattern.compile("[^a-zA-Z0-9]+");
|
private static final Pattern DELIMITERS = Pattern.compile("[^a-zA-Z0-9]+");
|
||||||
|
|
||||||
|
private static final Pattern NN_TOPMETRICS_PATTERN =
|
||||||
|
Pattern.compile(
|
||||||
|
"^(nn_top_user_op_counts_window_ms_\\d+)_op_.*?(total_count|count)$");
|
||||||
|
private static final Pattern NN_TOPMETRICS_TAGS_PATTERN =
|
||||||
|
Pattern
|
||||||
|
.compile("^op=(?<op>\\w+)(.user=(?<user>.*)|)\\.(TotalCount|count)$");
|
||||||
|
|
||||||
public PrometheusMetricsSink() {
|
public PrometheusMetricsSink() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,25 +105,28 @@ public class PrometheusMetricsSink implements MetricsSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeMetrics(Writer writer) throws IOException {
|
public void writeMetrics(Writer writer) throws IOException {
|
||||||
|
List<String> extendMetricsTags = new ArrayList<>();
|
||||||
for (Map.Entry<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetric :
|
for (Map.Entry<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetric :
|
||||||
promMetrics.entrySet()) {
|
promMetrics.entrySet()) {
|
||||||
AbstractMetric firstMetric = promMetric.getValue().values().iterator().next();
|
AbstractMetric firstMetric = promMetric.getValue().values().iterator().next();
|
||||||
|
String metricKey = getMetricKey(promMetric.getKey(), firstMetric,
|
||||||
|
extendMetricsTags);
|
||||||
|
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
builder.append("# HELP ")
|
builder.append("# HELP ")
|
||||||
.append(promMetric.getKey())
|
.append(metricKey)
|
||||||
.append(" ")
|
.append(" ")
|
||||||
.append(firstMetric.description())
|
.append(firstMetric.description())
|
||||||
.append("\n")
|
.append("\n")
|
||||||
.append("# TYPE ")
|
.append("# TYPE ")
|
||||||
.append(promMetric.getKey())
|
.append(metricKey)
|
||||||
.append(" ")
|
.append(" ")
|
||||||
.append(firstMetric.type().toString().toLowerCase())
|
.append(firstMetric.type().toString().toLowerCase())
|
||||||
.append("\n");
|
.append("\n");
|
||||||
|
|
||||||
for (Map.Entry<Collection<MetricsTag>, AbstractMetric> metric :
|
for (Map.Entry<Collection<MetricsTag>, AbstractMetric> metric :
|
||||||
promMetric.getValue().entrySet()) {
|
promMetric.getValue().entrySet()) {
|
||||||
builder.append(promMetric.getKey())
|
builder.append(metricKey)
|
||||||
.append("{");
|
.append("{");
|
||||||
|
|
||||||
String sep = "";
|
String sep = "";
|
||||||
|
@ -129,6 +142,13 @@ public class PrometheusMetricsSink implements MetricsSink {
|
||||||
sep = ",";
|
sep = ",";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!extendMetricsTags.isEmpty()) {
|
||||||
|
//add extend tags
|
||||||
|
for (String tagStr : extendMetricsTags) {
|
||||||
|
builder.append(sep).append(tagStr);
|
||||||
|
}
|
||||||
|
extendMetricsTags.clear();
|
||||||
|
}
|
||||||
builder.append("} ");
|
builder.append("} ");
|
||||||
builder.append(metric.getValue().value());
|
builder.append(metric.getValue().value());
|
||||||
builder.append("\n");
|
builder.append("\n");
|
||||||
|
@ -137,4 +157,39 @@ public class PrometheusMetricsSink implements MetricsSink {
|
||||||
writer.write(builder.toString());
|
writer.write(builder.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getMetricKey(String promMetricKey, AbstractMetric metric,
|
||||||
|
List<String> extendTags) {
|
||||||
|
Matcher matcher = NN_TOPMETRICS_PATTERN.matcher(promMetricKey);
|
||||||
|
if (matcher.find() && matcher.groupCount() == 2) {
|
||||||
|
extendTags.addAll(parseTopMetricsTags(metric.name()));
|
||||||
|
return String.format("%s_%s",
|
||||||
|
matcher.group(1), matcher.group(2));
|
||||||
|
}
|
||||||
|
return promMetricKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse Custom tags for TopMetrics.
|
||||||
|
*
|
||||||
|
* @param metricName metricName
|
||||||
|
* @return Tags for TopMetrics
|
||||||
|
*/
|
||||||
|
private List<String> parseTopMetricsTags(String metricName) {
|
||||||
|
List<String> topMetricsTags = new ArrayList<>();
|
||||||
|
Matcher matcher = NN_TOPMETRICS_TAGS_PATTERN.matcher(metricName);
|
||||||
|
if (matcher.find()) {
|
||||||
|
String op = matcher.group("op");
|
||||||
|
String user = matcher.group("user");
|
||||||
|
// add tag op = "$op"
|
||||||
|
topMetricsTags.add(String
|
||||||
|
.format("op=\"%s\"", op));
|
||||||
|
if (StringUtils.isNoneEmpty(user)) {
|
||||||
|
// add tag user = "$user"
|
||||||
|
topMetricsTags.add(String
|
||||||
|
.format("user=\"%s\"", user));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return topMetricsTags;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,17 +21,25 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric.Type;
|
import org.apache.hadoop.metrics2.annotation.Metric.Type;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.Interns;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test prometheus Sink.
|
* Test prometheus Sink.
|
||||||
|
@ -219,6 +227,53 @@ public class TestPrometheusMetricsSink {
|
||||||
sink.prometheusName(recordName, metricName));
|
sink.prometheusName(recordName, metricName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* testTopMetricsPublish.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTopMetricsPublish() throws IOException {
|
||||||
|
MetricsSystem metrics = DefaultMetricsSystem.instance();
|
||||||
|
|
||||||
|
metrics.init("test");
|
||||||
|
|
||||||
|
//GIVEN
|
||||||
|
PrometheusMetricsSink sink = new PrometheusMetricsSink();
|
||||||
|
|
||||||
|
metrics.register("prometheus", "prometheus", sink);
|
||||||
|
TestTopMetrics topMetrics = new TestTopMetrics();
|
||||||
|
topMetrics.add("60000");
|
||||||
|
topMetrics.add("1500000");
|
||||||
|
metrics.register(TestTopMetrics.TOPMETRICS_METRICS_SOURCE_NAME,
|
||||||
|
"Top N operations by user", topMetrics);
|
||||||
|
|
||||||
|
metrics.start();
|
||||||
|
|
||||||
|
metrics.publishMetricsNow();
|
||||||
|
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
||||||
|
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
|
||||||
|
|
||||||
|
//WHEN
|
||||||
|
sink.writeMetrics(writer);
|
||||||
|
writer.flush();
|
||||||
|
|
||||||
|
//THEN
|
||||||
|
String writtenMetrics = stream.toString(UTF_8.name());
|
||||||
|
System.out.println(writtenMetrics);
|
||||||
|
|
||||||
|
assertThat(writtenMetrics)
|
||||||
|
.contains(
|
||||||
|
"nn_top_user_op_counts_window_ms_60000_total_count{context=\"dfs\"")
|
||||||
|
.contains(
|
||||||
|
"nn_top_user_op_counts_window_ms_60000_count{")
|
||||||
|
.contains(
|
||||||
|
"nn_top_user_op_counts_window_ms_1500000_count{")
|
||||||
|
.contains(
|
||||||
|
"op=\"rename\",user=\"hadoop/TEST_HOSTNAME.com@HOSTNAME.COM\"");
|
||||||
|
|
||||||
|
metrics.stop();
|
||||||
|
metrics.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Example metric pojo.
|
* Example metric pojo.
|
||||||
*/
|
*/
|
||||||
|
@ -242,4 +297,38 @@ public class TestPrometheusMetricsSink {
|
||||||
@Metric
|
@Metric
|
||||||
private MutableCounterLong numBucketCreateFails;
|
private MutableCounterLong numBucketCreateFails;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Example metric TopMetrics.
|
||||||
|
*/
|
||||||
|
private class TestTopMetrics implements MetricsSource {
|
||||||
|
|
||||||
|
public static final String TOPMETRICS_METRICS_SOURCE_NAME =
|
||||||
|
"NNTopUserOpCounts";
|
||||||
|
private final List<String> windowMsNames = new ArrayList<>();
|
||||||
|
|
||||||
|
public void add(String windowMs) {
|
||||||
|
windowMsNames.add(String.format(".windowMs=%s", windowMs));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||||
|
for (String windowMs : windowMsNames) {
|
||||||
|
MetricsRecordBuilder rb = collector
|
||||||
|
.addRecord(TOPMETRICS_METRICS_SOURCE_NAME + windowMs)
|
||||||
|
.setContext("dfs");
|
||||||
|
rb.addCounter(
|
||||||
|
Interns.info("op=" + StringUtils.deleteWhitespace("rename")
|
||||||
|
+ ".TotalCount", "Total operation count"), 2);
|
||||||
|
rb.addCounter(
|
||||||
|
Interns.info("op=" + StringUtils.deleteWhitespace("rename")
|
||||||
|
+ ".user=" + "hadoop/TEST_HOSTNAME.com@HOSTNAME.COM"
|
||||||
|
+ ".count", "Total operations performed by user"), 3);
|
||||||
|
rb.addCounter(
|
||||||
|
Interns.info("op=" + StringUtils.deleteWhitespace("delete")
|
||||||
|
+ ".user=" + "test_user2"
|
||||||
|
+ ".count", "Total operations performed by user"), 4);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue