From a38dceda66e727ef167783d012beb39dc623633d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sat, 19 Sep 2015 12:41:29 +0100 Subject: [PATCH] HADOOP-12360. Create StatsD metrics2 sink. (Dave Marion via stevel) --- .../hadoop-common/CHANGES.txt | 2 + .../apache/hadoop/metrics2/package-info.java | 4 +- .../hadoop/metrics2/sink/StatsDSink.java | 218 ++++++++++++++++++ .../metrics2/impl/TestStatsDMetrics.java | 122 ++++++++++ 4 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestStatsDMetrics.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4a495daa231..f3a16e95926 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -31,6 +31,8 @@ Release 2.8.0 - UNRELEASED HADOOP-5732. Add SFTP FileSystem. (Ramtin Boustani and Inigo Goiri via cdouglas) + HADOOP-12360. Create StatsD metrics2 sink. (Dave Marion via stevel) + IMPROVEMENTS HADOOP-12271. Hadoop Jar Error Should Be More Explanatory diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java index be2149977cd..877c91274f5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java @@ -74,7 +74,9 @@
org.apache.hadoop.metrics2.sink
Builtin metrics sink implementations including the - {@link org.apache.hadoop.metrics2.sink.FileSink}. + {@link org.apache.hadoop.metrics2.sink.FileSink}, + {@link org.apache.hadoop.metrics2.sink.GraphiteSink}, and + {@link org.apache.hadoop.metrics2.sink.StatsDSink}.
org.apache.hadoop.metrics2.util
General utilities for implementing metrics sinks etc., including the diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java new file mode 100644 index 00000000000..b8059afdcb0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import java.io.Closeable; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricType; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.impl.MsInfo; +import org.apache.hadoop.net.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A metrics sink that writes metrics to a StatsD daemon. + * This sink will produce metrics of the form + * '[hostname].servicename.context.name.metricname:value|type' + * where hostname is optional. This is useful when sending to + * a daemon that is running on the localhost and will add the + * hostname to the metric (such as the + * CollectD StatsD plugin). + *
+ * To configure this plugin, you will need to add the following + * entries to your hadoop-metrics2.properties file: + *
+ *
+ * *.sink.statsd.class=org.apache.hadoop.metrics2.sink.StatsDSink
+ * [prefix].sink.statsd.server.host=
+ * [prefix].sink.statsd.server.port=
+ * [prefix].sink.statsd.skip.hostname=true|false (optional)
+ * [prefix].sink.statsd.service.name=NameNode (name you want for service)
+ * 
+ */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class StatsDSink implements MetricsSink, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(StatsDSink.class); + private static final String PERIOD = "."; + private static final String SERVER_HOST_KEY = "server.host"; + private static final String SERVER_PORT_KEY = "server.port"; + private static final String HOST_NAME_KEY = "host.name"; + private static final String SERVICE_NAME_KEY = "service.name"; + private static final String SKIP_HOSTNAME_KEY = "skip.hostname"; + private boolean skipHostname = false; + private String hostName = null; + private String serviceName = null; + private StatsD statsd = null; + + @Override + public void init(SubsetConfiguration conf) { + // Get StatsD host configurations. + final String serverHost = conf.getString(SERVER_HOST_KEY); + final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); + + skipHostname = conf.getBoolean(SKIP_HOSTNAME_KEY, false); + if (!skipHostname) { + hostName = conf.getString(HOST_NAME_KEY, null); + if (null == hostName) { + hostName = NetUtils.getHostname(); + } + } + + serviceName = conf.getString(SERVICE_NAME_KEY, null); + + statsd = new StatsD(serverHost, serverPort); + } + + @Override + public void putMetrics(MetricsRecord record) { + + String hn = hostName; + String ctx = record.context(); + String sn = serviceName; + + for (MetricsTag tag : record.tags()) { + if (tag.info().name().equals(MsInfo.Hostname.name()) + && tag.value() != null) { + hn = tag.value(); + } else if (tag.info().name().equals(MsInfo.Context.name()) + && tag.value() != null) { + ctx = tag.value(); + } else if (tag.info().name().equals(MsInfo.ProcessName.name()) + && tag.value() != null) { + sn = tag.value(); + } + } + + StringBuilder buf = new StringBuilder(); + if (!skipHostname && hn != null) { + int idx = hn.indexOf("."); + if (idx == -1) { + buf.append(hn).append(PERIOD); + } else { + buf.append(hn.substring(0, idx)).append(PERIOD); + } + } + buf.append(sn).append(PERIOD); + buf.append(ctx).append(PERIOD); + buf.append(record.name().replaceAll("\\.", "-")).append(PERIOD); + + // Collect datapoints. + for (AbstractMetric metric : record.metrics()) { + String type = null; + if (metric.type().equals(MetricType.COUNTER)) { + type = "c"; + } else if (metric.type().equals(MetricType.GAUGE)) { + type = "g"; + } + StringBuilder line = new StringBuilder(); + line.append(buf.toString()) + .append(metric.name().replace(' ', '_')) + .append(":") + .append(metric.value()) + .append("|") + .append(type); + writeMetric(line.toString()); + } + + } + + public void writeMetric(String line) { + try { + statsd.write(line); + } catch (IOException e) { + LOG.warn("Error sending metrics to StatsD", e); + throw new MetricsException("Error writing metric to StatsD", e); + } + } + + @Override + public void flush() { + } + + @Override + public void close() throws IOException { + statsd.close(); + } + + /** + * Class that sends UDP packets to StatsD daemon. + * + */ + public static class StatsD { + + private DatagramSocket socket = null; + private DatagramPacket packet = null; + private String serverHost; + private int serverPort; + + public StatsD(String serverHost, int serverPort) { + this.serverHost = serverHost; + this.serverPort = serverPort; + } + + public void createSocket() throws IOException { + try { + InetSocketAddress address = + new InetSocketAddress(this.serverHost, this.serverPort); + socket = new DatagramSocket(); + packet = + new DatagramPacket("".getBytes(StandardCharsets.UTF_8), 0, 0, + address.getAddress(), this.serverPort); + } catch (IOException ioe) { + throw NetUtils.wrapException(this.serverHost, this.serverPort, + "localhost", 0, ioe); + } + } + + public void write(String msg) throws IOException { + if (null == socket) { + createSocket(); + } + LOG.debug("Sending metric: {}", msg); + packet.setData(msg.getBytes(StandardCharsets.UTF_8)); + socket.send(packet); + } + + public void close() throws IOException { + try { + if (socket != null) { + socket.close(); + } + } finally { + socket = null; + } + } + + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestStatsDMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestStatsDMetrics.java new file mode 100644 index 00000000000..355c1cb5abb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestStatsDMetrics.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.impl; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricType; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.sink.StatsDSink; +import org.apache.hadoop.metrics2.sink.StatsDSink.StatsD; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +public class TestStatsDMetrics { + + private AbstractMetric makeMetric(String name, Number value, + MetricType type) { + AbstractMetric metric = mock(AbstractMetric.class); + when(metric.name()).thenReturn(name); + when(metric.value()).thenReturn(value); + when(metric.type()).thenReturn(type); + return metric; + } + + @Test(timeout=3000) + public void testPutMetrics() throws IOException, InterruptedException { + final StatsDSink sink = new StatsDSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Hostname, "host")); + tags.add(new MetricsTag(MsInfo.Context, "jvm")); + tags.add(new MetricsTag(MsInfo.ProcessName, "process")); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1.25, MetricType.COUNTER)); + metrics.add(makeMetric("foo2", 2.25, MetricType.GAUGE)); + final MetricsRecord record = + new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); + + try (DatagramSocket sock = new DatagramSocket()) { + sock.setReceiveBufferSize(8192); + final StatsDSink.StatsD mockStatsD = + new StatsD(sock.getLocalAddress().getHostName(), + sock.getLocalPort()); + Whitebox.setInternalState(sink, "statsd", mockStatsD); + final DatagramPacket p = new DatagramPacket(new byte[8192], 8192); + sink.putMetrics(record); + sock.receive(p); + + String result =new String(p.getData(), 0, p.getLength(), + Charset.forName("UTF-8")); + assertTrue( + "Received data did not match data sent", + result.equals("host.process.jvm.Context.foo1:1.25|c") || + result.equals("host.process.jvm.Context.foo2:2.25|g")); + + } finally { + sink.close(); + } + } + + @Test(timeout=3000) + public void testPutMetrics2() throws IOException { + StatsDSink sink = new StatsDSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Hostname, null)); + tags.add(new MetricsTag(MsInfo.Context, "jvm")); + tags.add(new MetricsTag(MsInfo.ProcessName, "process")); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1, MetricType.COUNTER)); + metrics.add(makeMetric("foo2", 2, MetricType.GAUGE)); + MetricsRecord record = + new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); + + try (DatagramSocket sock = new DatagramSocket()) { + sock.setReceiveBufferSize(8192); + final StatsDSink.StatsD mockStatsD = + new StatsD(sock.getLocalAddress().getHostName(), + sock.getLocalPort()); + Whitebox.setInternalState(sink, "statsd", mockStatsD); + final DatagramPacket p = new DatagramPacket(new byte[8192], 8192); + sink.putMetrics(record); + sock.receive(p); + String result = + new String(p.getData(), 0, p.getLength(), Charset.forName("UTF-8")); + + assertTrue("Received data did not match data sent", + result.equals("process.jvm.Context.foo1:1|c") || + result.equals("process.jvm.Context.foo2:2|g")); + } finally { + sink.close(); + } + } + +}