HADOOP-10181. GangliaContext does not work with multicast ganglia setup. Contributed by Andrew Johnson.

(cherry picked from commit 8004a00230)

Conflicts:
	hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
This commit is contained in:
cnauroth 2015-02-02 11:09:09 -08:00
parent 05d831c4b7
commit aee297ccea
6 changed files with 185 additions and 18 deletions

View File

@ -455,6 +455,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11432. Fix SymlinkBaseTest#testCreateLinkUsingPartQualPath2.
(Liang Xie via gera)
HADOOP-10181. GangliaContext does not work with multicast ganglia setup.
(Andrew Johnson via cnauroth)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -21,10 +21,7 @@
package org.apache.hadoop.metrics.ganglia;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -54,13 +51,16 @@ public class GangliaContext extends AbstractMetricsContext {
private static final String SLOPE_PROPERTY = "slope";
private static final String TMAX_PROPERTY = "tmax";
private static final String DMAX_PROPERTY = "dmax";
private static final String MULTICAST_PROPERTY = "multicast";
private static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
private static final String DEFAULT_UNITS = "";
private static final String DEFAULT_SLOPE = "both";
private static final int DEFAULT_TMAX = 60;
private static final int DEFAULT_DMAX = 0;
private static final int DEFAULT_PORT = 8649;
private static final int BUFFER_SIZE = 1500; // as per libgmond.c
private static final int DEFAULT_MULTICAST_TTL = 1;
private final Log LOG = LogFactory.getLog(this.getClass());
@ -83,6 +83,8 @@ public class GangliaContext extends AbstractMetricsContext {
private Map<String,String> slopeTable;
private Map<String,String> tmaxTable;
private Map<String,String> dmaxTable;
private boolean multicastEnabled;
private int multicastTtl;
protected DatagramSocket datagramSocket;
@ -91,6 +93,7 @@ public class GangliaContext extends AbstractMetricsContext {
public GangliaContext() {
}
@Override
@InterfaceAudience.Private
public void init(String contextName, ContextFactory factory) {
super.init(contextName, factory);
@ -103,12 +106,26 @@ public void init(String contextName, ContextFactory factory) {
slopeTable = getAttributeTable(SLOPE_PROPERTY);
tmaxTable = getAttributeTable(TMAX_PROPERTY);
dmaxTable = getAttributeTable(DMAX_PROPERTY);
multicastEnabled = Boolean.parseBoolean(getAttribute(MULTICAST_PROPERTY));
String multicastTtlValue = getAttribute(MULTICAST_TTL_PROPERTY);
if (multicastEnabled) {
if (multicastTtlValue == null) {
multicastTtl = DEFAULT_MULTICAST_TTL;
} else {
multicastTtl = Integer.parseInt(multicastTtlValue);
}
}
try {
datagramSocket = new DatagramSocket();
}
catch (SocketException se) {
se.printStackTrace();
if (multicastEnabled) {
LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
datagramSocket = new MulticastSocket();
((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
} else {
datagramSocket = new DatagramSocket();
}
} catch (IOException e) {
LOG.error(e);
}
}
@ -123,6 +140,7 @@ public void close() {
}
}
@Override
@InterfaceAudience.Private
public void emitRecord(String contextName, String recordName,
OutputRecord outRec)

View File

@ -54,6 +54,12 @@
<dt><i>contextName</i>.period</dt>
<dd>The period in seconds on which the metric data is sent to the
server(s).</dd>
<dt><i>contextName</i>.multicast</dt>
<dd>Enable multicast for Ganglia</dd>
<dt><i>contextName</i>.multicast.ttl</dt>
<dd>TTL for multicast packets</dd>
<dt><i>contextName</i>.units.<i>recordName</i>.<i>metricName</i></dt>
<dd>The units for the specified metric in the specified record.</dd>

View File

@ -19,12 +19,7 @@
package org.apache.hadoop.metrics2.sink.ganglia;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.net.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -64,7 +59,11 @@ public abstract class AbstractGangliaSink implements MetricsSink {
public static final int DEFAULT_DMAX = 0;
public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
public static final int DEFAULT_PORT = 8649;
public static final boolean DEFAULT_MULTICAST_ENABLED = false;
public static final int DEFAULT_MULTICAST_TTL = 1;
public static final String SERVERS_PROPERTY = "servers";
public static final String MULTICAST_ENABLED_PROPERTY = "multicast";
public static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
public static final int BUFFER_SIZE = 1500; // as per libgmond.c
public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
@ -73,6 +72,8 @@ public abstract class AbstractGangliaSink implements MetricsSink {
private String hostName = "UNKNOWN.example.com";
private DatagramSocket datagramSocket;
private List<? extends SocketAddress> metricsServers;
private boolean multicastEnabled;
private int multicastTtl;
private byte[] buffer = new byte[BUFFER_SIZE];
private int offset;
private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
@ -133,6 +134,9 @@ public void init(SubsetConfiguration conf) {
// load the gannglia servers from properties
metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
DEFAULT_PORT);
multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY,
DEFAULT_MULTICAST_ENABLED);
multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL);
// extract the Ganglia conf per metrics
gangliaConfMap = new HashMap<String, GangliaConf>();
@ -142,9 +146,15 @@ public void init(SubsetConfiguration conf) {
loadGangliaConf(GangliaConfType.slope);
try {
datagramSocket = new DatagramSocket();
} catch (SocketException se) {
LOG.error(se);
if (multicastEnabled) {
LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
datagramSocket = new MulticastSocket();
((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
} else {
datagramSocket = new DatagramSocket();
}
} catch (IOException e) {
LOG.error(e);
}
// see if sparseMetrics is supported. Default is false
@ -293,4 +303,12 @@ protected boolean isSupportSparseMetrics() {
void setDatagramSocket(DatagramSocket datagramSocket) {
this.datagramSocket = datagramSocket;
}
/**
* Used only by unit tests
* @return the datagramSocket for this sink
*/
DatagramSocket getDatagramSocket() {
return datagramSocket;
}
}

View File

@ -22,13 +22,54 @@
package org.apache.hadoop.metrics.ganglia;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.metrics.ContextFactory;
import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
import java.net.MulticastSocket;
public class TestGangliaContext {
@Test
public void testShouldCreateDatagramSocketByDefault() throws Exception {
GangliaContext context = new GangliaContext();
context.init("gangliaContext", ContextFactory.getFactory());
assertFalse("Created MulticastSocket", context.datagramSocket instanceof MulticastSocket);
}
@Test
public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception {
GangliaContext context = new GangliaContext();
ContextFactory factory = ContextFactory.getFactory();
factory.setAttribute("gangliaContext.multicast", "false");
context.init("gangliaContext", factory);
assertFalse("Created MulticastSocket", context.datagramSocket instanceof MulticastSocket);
}
@Test
public void testShouldCreateMulticastSocket() throws Exception {
GangliaContext context = new GangliaContext();
ContextFactory factory = ContextFactory.getFactory();
factory.setAttribute("gangliaContext.multicast", "true");
context.init("gangliaContext", factory);
assertTrue("Did not create MulticastSocket", context.datagramSocket instanceof MulticastSocket);
MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket;
assertEquals("Did not set default TTL", multicastSocket.getTimeToLive(), 1);
}
@Test
public void testShouldSetMulticastSocketTtl() throws Exception {
GangliaContext context = new GangliaContext();
ContextFactory factory = ContextFactory.getFactory();
factory.setAttribute("gangliaContext.multicast", "true");
factory.setAttribute("gangliaContext.multicast.ttl", "10");
context.init("gangliaContext", factory);
MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket;
assertEquals("Did not set TTL", multicastSocket.getTimeToLive(), 10);
}
@Test
public void testCloseShouldCloseTheSocketWhichIsCreatedByInit() throws Exception {

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink.ganglia;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.metrics2.impl.ConfigBuilder;
import org.junit.Test;
import java.net.DatagramSocket;
import java.net.MulticastSocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestGangliaSink {
@Test
public void testShouldCreateDatagramSocketByDefault() throws Exception {
SubsetConfiguration conf = new ConfigBuilder()
.subset("test.sink.ganglia");
GangliaSink30 gangliaSink = new GangliaSink30();
gangliaSink.init(conf);
DatagramSocket socket = gangliaSink.getDatagramSocket();
assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket);
}
@Test
public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception {
SubsetConfiguration conf = new ConfigBuilder()
.add("test.sink.ganglia.multicast", false)
.subset("test.sink.ganglia");
GangliaSink30 gangliaSink = new GangliaSink30();
gangliaSink.init(conf);
DatagramSocket socket = gangliaSink.getDatagramSocket();
assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket);
}
@Test
public void testShouldCreateMulticastSocket() throws Exception {
SubsetConfiguration conf = new ConfigBuilder()
.add("test.sink.ganglia.multicast", true)
.subset("test.sink.ganglia");
GangliaSink30 gangliaSink = new GangliaSink30();
gangliaSink.init(conf);
DatagramSocket socket = gangliaSink.getDatagramSocket();
assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket);
int ttl = ((MulticastSocket) socket).getTimeToLive();
assertEquals("Did not set default TTL", 1, ttl);
}
@Test
public void testShouldSetMulticastSocketTtl() throws Exception {
SubsetConfiguration conf = new ConfigBuilder()
.add("test.sink.ganglia.multicast", true)
.add("test.sink.ganglia.multicast.ttl", 3)
.subset("test.sink.ganglia");
GangliaSink30 gangliaSink = new GangliaSink30();
gangliaSink.init(conf);
DatagramSocket socket = gangliaSink.getDatagramSocket();
assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket);
int ttl = ((MulticastSocket) socket).getTimeToLive();
assertEquals("Did not set TTL", 3, ttl);
}
}