From aee297ccea383bd8ef988e450c9b9937a3d74746 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Mon, 2 Feb 2015 11:09:09 -0800 Subject: [PATCH] HADOOP-10181. GangliaContext does not work with multicast ganglia setup. Contributed by Andrew Johnson. (cherry picked from commit 8004a002307940176cc188657c68e85171a5b5a8) Conflicts: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java --- .../hadoop-common/CHANGES.txt | 3 + .../metrics/ganglia/GangliaContext.java | 36 ++++++--- .../hadoop/metrics/ganglia/package.html | 6 ++ .../sink/ganglia/AbstractGangliaSink.java | 36 ++++++--- .../metrics/ganglia/TestGangliaContext.java | 41 ++++++++++ .../sink/ganglia/TestGangliaSink.java | 81 +++++++++++++++++++ 6 files changed, 185 insertions(+), 18 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 7e740695622..be3fdc5d9a4 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -455,6 +455,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11432. Fix SymlinkBaseTest#testCreateLinkUsingPartQualPath2. (Liang Xie via gera) + HADOOP-10181. GangliaContext does not work with multicast ganglia setup. + (Andrew Johnson via cnauroth) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java index 7aaf0afeced..5ed2652b11c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java @@ -21,10 +21,7 @@ package org.apache.hadoop.metrics.ganglia; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.SocketAddress; -import java.net.SocketException; +import java.net.*; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,13 +51,16 @@ public class GangliaContext extends AbstractMetricsContext { private static final String SLOPE_PROPERTY = "slope"; private static final String TMAX_PROPERTY = "tmax"; private static final String DMAX_PROPERTY = "dmax"; - + private static final String MULTICAST_PROPERTY = "multicast"; + private static final String MULTICAST_TTL_PROPERTY = "multicast.ttl"; + private static final String DEFAULT_UNITS = ""; private static final String DEFAULT_SLOPE = "both"; private static final int DEFAULT_TMAX = 60; private static final int DEFAULT_DMAX = 0; private static final int DEFAULT_PORT = 8649; private static final int BUFFER_SIZE = 1500; // as per libgmond.c + private static final int DEFAULT_MULTICAST_TTL = 1; private final Log LOG = LogFactory.getLog(this.getClass()); @@ -83,6 +83,8 @@ public class GangliaContext extends AbstractMetricsContext { private Map slopeTable; private Map tmaxTable; private Map dmaxTable; + private boolean multicastEnabled; + private int multicastTtl; protected DatagramSocket datagramSocket; @@ -91,6 +93,7 @@ public class GangliaContext extends AbstractMetricsContext { public GangliaContext() { } + @Override @InterfaceAudience.Private public void init(String contextName, ContextFactory factory) { super.init(contextName, factory); @@ -103,12 +106,26 @@ public class GangliaContext extends AbstractMetricsContext { slopeTable = getAttributeTable(SLOPE_PROPERTY); tmaxTable = getAttributeTable(TMAX_PROPERTY); dmaxTable = getAttributeTable(DMAX_PROPERTY); + multicastEnabled = Boolean.parseBoolean(getAttribute(MULTICAST_PROPERTY)); + String multicastTtlValue = getAttribute(MULTICAST_TTL_PROPERTY); + if (multicastEnabled) { + if (multicastTtlValue == null) { + multicastTtl = DEFAULT_MULTICAST_TTL; + } else { + multicastTtl = Integer.parseInt(multicastTtlValue); + } + } try { - datagramSocket = new DatagramSocket(); - } - catch (SocketException se) { - se.printStackTrace(); + if (multicastEnabled) { + LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl); + datagramSocket = new MulticastSocket(); + ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl); + } else { + datagramSocket = new DatagramSocket(); + } + } catch (IOException e) { + LOG.error(e); } } @@ -123,6 +140,7 @@ public class GangliaContext extends AbstractMetricsContext { } } + @Override @InterfaceAudience.Private public void emitRecord(String contextName, String recordName, OutputRecord outRec) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html index 87598e50332..b9acfaea699 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html @@ -54,6 +54,12 @@ These are the implementation specific factory attributes
contextName.period
The period in seconds on which the metric data is sent to the server(s).
+ +
contextName.multicast
+
Enable multicast for Ganglia
+ +
contextName.multicast.ttl
+
TTL for multicast packets
contextName.units.recordName.metricName
The units for the specified metric in the specified record.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java index 6757068b391..9a68f31c299 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java @@ -19,12 +19,7 @@ package org.apache.hadoop.metrics2.sink.ganglia; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.UnknownHostException; +import java.net.*; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,7 +59,11 @@ public abstract class AbstractGangliaSink implements MetricsSink { public static final int DEFAULT_DMAX = 0; public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both; public static final int DEFAULT_PORT = 8649; + public static final boolean DEFAULT_MULTICAST_ENABLED = false; + public static final int DEFAULT_MULTICAST_TTL = 1; public static final String SERVERS_PROPERTY = "servers"; + public static final String MULTICAST_ENABLED_PROPERTY = "multicast"; + public static final String MULTICAST_TTL_PROPERTY = "multicast.ttl"; public static final int BUFFER_SIZE = 1500; // as per libgmond.c public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse"; public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false; @@ -73,6 +72,8 @@ public abstract class AbstractGangliaSink implements MetricsSink { private String hostName = "UNKNOWN.example.com"; private DatagramSocket datagramSocket; private List metricsServers; + private boolean multicastEnabled; + private int multicastTtl; private byte[] buffer = new byte[BUFFER_SIZE]; private int offset; private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT; @@ -133,6 +134,9 @@ public abstract class AbstractGangliaSink implements MetricsSink { // load the gannglia servers from properties metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY), DEFAULT_PORT); + multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY, + DEFAULT_MULTICAST_ENABLED); + multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL); // extract the Ganglia conf per metrics gangliaConfMap = new HashMap(); @@ -142,9 +146,15 @@ public abstract class AbstractGangliaSink implements MetricsSink { loadGangliaConf(GangliaConfType.slope); try { - datagramSocket = new DatagramSocket(); - } catch (SocketException se) { - LOG.error(se); + if (multicastEnabled) { + LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl); + datagramSocket = new MulticastSocket(); + ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl); + } else { + datagramSocket = new DatagramSocket(); + } + } catch (IOException e) { + LOG.error(e); } // see if sparseMetrics is supported. Default is false @@ -293,4 +303,12 @@ public abstract class AbstractGangliaSink implements MetricsSink { void setDatagramSocket(DatagramSocket datagramSocket) { this.datagramSocket = datagramSocket; } + + /** + * Used only by unit tests + * @return the datagramSocket for this sink + */ + DatagramSocket getDatagramSocket() { + return datagramSocket; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java index deb8231154c..8637f8ceea4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java @@ -22,13 +22,54 @@ package org.apache.hadoop.metrics.ganglia; import org.junit.Test; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.hadoop.metrics.ContextFactory; import org.apache.hadoop.metrics.spi.AbstractMetricsContext; +import java.net.MulticastSocket; + public class TestGangliaContext { + @Test + public void testShouldCreateDatagramSocketByDefault() throws Exception { + GangliaContext context = new GangliaContext(); + context.init("gangliaContext", ContextFactory.getFactory()); + assertFalse("Created MulticastSocket", context.datagramSocket instanceof MulticastSocket); + } + + @Test + public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception { + GangliaContext context = new GangliaContext(); + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute("gangliaContext.multicast", "false"); + context.init("gangliaContext", factory); + assertFalse("Created MulticastSocket", context.datagramSocket instanceof MulticastSocket); + } + + @Test + public void testShouldCreateMulticastSocket() throws Exception { + GangliaContext context = new GangliaContext(); + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute("gangliaContext.multicast", "true"); + context.init("gangliaContext", factory); + assertTrue("Did not create MulticastSocket", context.datagramSocket instanceof MulticastSocket); + MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket; + assertEquals("Did not set default TTL", multicastSocket.getTimeToLive(), 1); + } + + @Test + public void testShouldSetMulticastSocketTtl() throws Exception { + GangliaContext context = new GangliaContext(); + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute("gangliaContext.multicast", "true"); + factory.setAttribute("gangliaContext.multicast.ttl", "10"); + context.init("gangliaContext", factory); + MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket; + assertEquals("Did not set TTL", multicastSocket.getTimeToLive(), 10); + } @Test public void testCloseShouldCloseTheSocketWhichIsCreatedByInit() throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java new file mode 100644 index 00000000000..aa2c2591794 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.ganglia; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.metrics2.impl.ConfigBuilder; +import org.junit.Test; + +import java.net.DatagramSocket; +import java.net.MulticastSocket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestGangliaSink { + @Test + public void testShouldCreateDatagramSocketByDefault() throws Exception { + SubsetConfiguration conf = new ConfigBuilder() + .subset("test.sink.ganglia"); + + GangliaSink30 gangliaSink = new GangliaSink30(); + gangliaSink.init(conf); + DatagramSocket socket = gangliaSink.getDatagramSocket(); + assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket); + } + + @Test + public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception { + SubsetConfiguration conf = new ConfigBuilder() + .add("test.sink.ganglia.multicast", false) + .subset("test.sink.ganglia"); + GangliaSink30 gangliaSink = new GangliaSink30(); + gangliaSink.init(conf); + DatagramSocket socket = gangliaSink.getDatagramSocket(); + assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket); + } + + @Test + public void testShouldCreateMulticastSocket() throws Exception { + SubsetConfiguration conf = new ConfigBuilder() + .add("test.sink.ganglia.multicast", true) + .subset("test.sink.ganglia"); + GangliaSink30 gangliaSink = new GangliaSink30(); + gangliaSink.init(conf); + DatagramSocket socket = gangliaSink.getDatagramSocket(); + assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket); + int ttl = ((MulticastSocket) socket).getTimeToLive(); + assertEquals("Did not set default TTL", 1, ttl); + } + + @Test + public void testShouldSetMulticastSocketTtl() throws Exception { + SubsetConfiguration conf = new ConfigBuilder() + .add("test.sink.ganglia.multicast", true) + .add("test.sink.ganglia.multicast.ttl", 3) + .subset("test.sink.ganglia"); + GangliaSink30 gangliaSink = new GangliaSink30(); + gangliaSink.init(conf); + DatagramSocket socket = gangliaSink.getDatagramSocket(); + assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket); + int ttl = ((MulticastSocket) socket).getTimeToLive(); + assertEquals("Did not set TTL", 3, ttl); + } +}