HADOOP-16398. Exports Hadoop metrics to Prometheus (#1170)
This commit is contained in:
parent
a6f47b5876
commit
8bda91d20a
|
@ -961,5 +961,13 @@ public class CommonConfigurationKeysPublic {
|
|||
/** Default shutdown hook timeout: {@value} seconds. */
|
||||
public static final long SERVICE_SHUTDOWN_TIMEOUT_DEFAULT = 30;
|
||||
|
||||
/**
|
||||
* @see
|
||||
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||
* core-default.xml</a>
|
||||
*/
|
||||
public static final String HADOOP_PROMETHEUS_ENABLED =
|
||||
"hadoop.prometheus.endpoint.enabled";
|
||||
public static final boolean HADOOP_PROMETHEUS_ENABLED_DEFAULT = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -61,8 +61,11 @@ import org.apache.hadoop.conf.ConfServlet;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configuration.IntegerRanges;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.jmx.JMXJsonServlet;
|
||||
import org.apache.hadoop.log.LogLevel;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.sink.PrometheusMetricsSink;
|
||||
import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -191,6 +194,11 @@ public final class HttpServer2 implements FilterContainer {
|
|||
private static final String X_FRAME_OPTIONS = "X-FRAME-OPTIONS";
|
||||
private static final Pattern PATTERN_HTTP_HEADER_REGEX =
|
||||
Pattern.compile(HTTP_HEADER_REGEX);
|
||||
|
||||
private boolean prometheusSupport;
|
||||
protected static final String PROMETHEUS_SINK = "PROMETHEUS_SINK";
|
||||
private PrometheusMetricsSink prometheusMetricsSink;
|
||||
|
||||
/**
|
||||
* Class to construct instances of HTTP server with specific options.
|
||||
*/
|
||||
|
@ -612,6 +620,19 @@ public final class HttpServer2 implements FilterContainer {
|
|||
}
|
||||
|
||||
addDefaultServlets();
|
||||
addPrometheusServlet(conf);
|
||||
}
|
||||
|
||||
private void addPrometheusServlet(Configuration conf) {
|
||||
prometheusSupport = conf.getBoolean(
|
||||
CommonConfigurationKeysPublic.HADOOP_PROMETHEUS_ENABLED,
|
||||
CommonConfigurationKeysPublic.HADOOP_PROMETHEUS_ENABLED_DEFAULT);
|
||||
if (prometheusSupport) {
|
||||
prometheusMetricsSink = new PrometheusMetricsSink();
|
||||
getWebAppContext().getServletContext()
|
||||
.setAttribute(PROMETHEUS_SINK, prometheusMetricsSink);
|
||||
addServlet("prometheus", "/prom", PrometheusServlet.class);
|
||||
}
|
||||
}
|
||||
|
||||
private void addListener(ServerConnector connector) {
|
||||
|
@ -1133,6 +1154,11 @@ public final class HttpServer2 implements FilterContainer {
|
|||
try {
|
||||
openListeners();
|
||||
webServer.start();
|
||||
if (prometheusSupport) {
|
||||
DefaultMetricsSystem.instance()
|
||||
.register("prometheus", "Hadoop metrics prometheus exporter",
|
||||
prometheusMetricsSink);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.info("HttpServer.start() threw a non Bind IOException", ex);
|
||||
throw ex;
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.http;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.sink.PrometheusMetricsSink;
|
||||
|
||||
/**
|
||||
* Servlet to publish hadoop metrics in prometheus format.
|
||||
*/
|
||||
public class PrometheusServlet extends HttpServlet {
|
||||
|
||||
public PrometheusMetricsSink getPrometheusSink() {
|
||||
return
|
||||
(PrometheusMetricsSink) getServletContext().getAttribute(
|
||||
HttpServer2.PROMETHEUS_SINK);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
|
||||
throws ServletException, IOException {
|
||||
DefaultMetricsSystem.instance().publishMetricsNow();
|
||||
getPrometheusSink().writeMetrics(resp.getWriter());
|
||||
resp.getWriter().flush();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.metrics2.sink;
|
||||
|
||||
import org.apache.commons.configuration2.SubsetConfiguration;
|
||||
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||
import org.apache.hadoop.metrics2.MetricType;
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
import org.apache.hadoop.metrics2.MetricsSink;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
* Metrics sink for prometheus exporter.
|
||||
* <p>
|
||||
* Stores the metric data in-memory and return with it on request.
|
||||
*/
|
||||
public class PrometheusMetricsSink implements MetricsSink {
|
||||
|
||||
/**
|
||||
* Cached output lines for each metrics.
|
||||
*/
|
||||
private Map<String, String> metricLines = new HashMap<>();
|
||||
|
||||
private static final Pattern SPLIT_PATTERN =
|
||||
Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
|
||||
|
||||
public PrometheusMetricsSink() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putMetrics(MetricsRecord metricsRecord) {
|
||||
for (AbstractMetric metrics : metricsRecord.metrics()) {
|
||||
if (metrics.type() == MetricType.COUNTER
|
||||
|| metrics.type() == MetricType.GAUGE) {
|
||||
|
||||
String key = prometheusName(
|
||||
metricsRecord.name(), metrics.name());
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("# TYPE " + key + " " +
|
||||
metrics.type().toString().toLowerCase() + "\n");
|
||||
builder.append(key + "{");
|
||||
String sep = "";
|
||||
|
||||
//add tags
|
||||
for (MetricsTag tag : metricsRecord.tags()) {
|
||||
String tagName = tag.name().toLowerCase();
|
||||
|
||||
//ignore specific tag which includes sub-hierarchy
|
||||
if (!tagName.equals("numopenconnectionsperuser")) {
|
||||
builder.append(
|
||||
sep + tagName + "=\"" + tag.value() + "\"");
|
||||
sep = ",";
|
||||
}
|
||||
}
|
||||
builder.append("} ");
|
||||
builder.append(metrics.value());
|
||||
metricLines.put(key, builder.toString());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert CamelCase based names to lower-case names where the separator
|
||||
* is the underscore, to follow prometheus naming conventions.
|
||||
*/
|
||||
public String prometheusName(String recordName,
|
||||
String metricName) {
|
||||
String baseName = StringUtils.capitalize(recordName)
|
||||
+ StringUtils.capitalize(metricName);
|
||||
baseName = baseName.replace('-', '_');
|
||||
String[] parts = SPLIT_PATTERN.split(baseName);
|
||||
return String.join("_", parts).toLowerCase();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(SubsetConfiguration subsetConfiguration) {
|
||||
|
||||
}
|
||||
|
||||
public void writeMetrics(Writer writer) throws IOException {
|
||||
for (String line : metricLines.values()) {
|
||||
writer.write(line + "\n");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3494,4 +3494,14 @@
|
|||
the right KMS for encrypted files.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.prometheus.endpoint.enabled</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If set to true, prometheus compatible metric page on the HTTP servers
|
||||
is enabled via '/prom' endpoint.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.metrics2.sink;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
/**
|
||||
* Test prometheus Sink.
|
||||
*/
|
||||
public class TestPrometheusMetricsSink {
|
||||
|
||||
@Test
|
||||
public void testPublish() throws IOException {
|
||||
//GIVEN
|
||||
MetricsSystem metrics = DefaultMetricsSystem.instance();
|
||||
|
||||
metrics.init("test");
|
||||
PrometheusMetricsSink sink = new PrometheusMetricsSink();
|
||||
metrics.register("Prometheus", "Prometheus", sink);
|
||||
TestMetrics testMetrics = metrics
|
||||
.register("TestMetrics", "Testing metrics", new TestMetrics());
|
||||
|
||||
metrics.start();
|
||||
testMetrics.numBucketCreateFails.incr();
|
||||
metrics.publishMetricsNow();
|
||||
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
||||
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
|
||||
|
||||
//WHEN
|
||||
sink.writeMetrics(writer);
|
||||
writer.flush();
|
||||
|
||||
//THEN
|
||||
String writtenMetrics = stream.toString(UTF_8.name());
|
||||
System.out.println(writtenMetrics);
|
||||
Assert.assertTrue(
|
||||
"The expected metric line is missing from prometheus metrics output",
|
||||
writtenMetrics.contains(
|
||||
"test_metrics_num_bucket_create_fails{context=\"dfs\"")
|
||||
);
|
||||
|
||||
metrics.stop();
|
||||
metrics.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamingCamelCase() {
|
||||
PrometheusMetricsSink sink = new PrometheusMetricsSink();
|
||||
|
||||
Assert.assertEquals("rpc_time_some_metrics",
|
||||
sink.prometheusName("RpcTime", "SomeMetrics"));
|
||||
|
||||
Assert.assertEquals("om_rpc_time_om_info_keys",
|
||||
sink.prometheusName("OMRpcTime", "OMInfoKeys"));
|
||||
|
||||
Assert.assertEquals("rpc_time_small",
|
||||
sink.prometheusName("RpcTime", "small"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamingPipeline() {
|
||||
PrometheusMetricsSink sink = new PrometheusMetricsSink();
|
||||
|
||||
String recordName = "SCMPipelineMetrics";
|
||||
String metricName = "NumBlocksAllocated-"
|
||||
+ "RATIS-THREE-47659e3d-40c9-43b3-9792-4982fc279aba";
|
||||
Assert.assertEquals(
|
||||
"scm_pipeline_metrics_"
|
||||
+ "num_blocks_allocated_"
|
||||
+ "ratis_three_47659e3d_40c9_43b3_9792_4982fc279aba",
|
||||
sink.prometheusName(recordName, metricName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Example metric pojo.
|
||||
*/
|
||||
@Metrics(about = "Test Metrics", context = "dfs")
|
||||
private static class TestMetrics {
|
||||
|
||||
@Metric
|
||||
private MutableCounterLong numBucketCreateFails;
|
||||
}
|
||||
}
|
|
@ -70,6 +70,12 @@ public abstract class BaseHttpServer {
|
|||
this.httpAddress = getHttpBindAddress();
|
||||
this.httpsAddress = getHttpsBindAddress();
|
||||
HttpServer2.Builder builder = null;
|
||||
|
||||
// Avoid registering o.a.h.http.PrometheusServlet in HttpServer2.
|
||||
// TODO: Replace "hadoop.prometheus.endpoint.enabled" with
|
||||
// CommonConfigurationKeysPublic.HADOOP_PROMETHEUS_ENABLED when possible.
|
||||
conf.setBoolean("hadoop.prometheus.endpoint.enabled", false);
|
||||
|
||||
builder = DFSUtil.httpServerTemplateForNNAndJN(conf, this.httpAddress,
|
||||
this.httpsAddress, name, getSpnegoPrincipal(), getKeytabFile());
|
||||
|
||||
|
|
Loading…
Reference in New Issue