HADOOP-6977. Herriot daemon clients should vend statistics. Contributed by Konstantin Boudnik
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1033812 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fa87ae8058
commit
50659f1a76
|
@ -166,6 +166,8 @@ Trunk (unreleased changes)
|
||||||
HADOOP-4675. Current Ganglia metrics implementation is incompatible with
|
HADOOP-4675. Current Ganglia metrics implementation is incompatible with
|
||||||
Ganglia 3.1. (Brian Bockelman via tomwhite)
|
Ganglia 3.1. (Brian Bockelman via tomwhite)
|
||||||
|
|
||||||
|
HADOOP-6977. Herriot daemon clients should vend statistics (cos)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
|
HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
|
||||||
|
|
|
@ -19,17 +19,21 @@
|
||||||
package org.apache.hadoop.test.system;
|
package org.apache.hadoop.test.system;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.ConcurrentModificationException;
|
|
||||||
import java.util.List;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.test.system.process.RemoteProcess;
|
import org.apache.hadoop.test.system.process.RemoteProcess;
|
||||||
|
|
||||||
|
import javax.management.*;
|
||||||
|
import javax.management.remote.JMXConnector;
|
||||||
|
import javax.management.remote.JMXConnectorFactory;
|
||||||
|
import javax.management.remote.JMXServiceURL;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract class which encapsulates the DaemonClient which is used in the
|
* Abstract class which encapsulates the DaemonClient which is used in the
|
||||||
* system tests.<br/>
|
* system tests.<br/>
|
||||||
|
@ -38,10 +42,15 @@ import org.apache.hadoop.test.system.process.RemoteProcess;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
|
public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
private Boolean jmxEnabled = null;
|
||||||
|
private MBeanServerConnection connection;
|
||||||
|
private int jmxPortNumber = -1;
|
||||||
private RemoteProcess process;
|
private RemoteProcess process;
|
||||||
private boolean connected;
|
private boolean connected;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AbstractDaemonClient.class);
|
private static final Log LOG = LogFactory.getLog(AbstractDaemonClient.class);
|
||||||
|
private static final String HADOOP_JMX_DOMAIN = "Hadoop";
|
||||||
|
private static final String HADOOP_OPTS_ENV = "HADOOP_OPTS";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a Daemon client.<br/>
|
* Create a Daemon client.<br/>
|
||||||
|
@ -79,7 +88,7 @@ public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disconnect the underlying RPC proxy to the daemon.<br/>
|
* Disconnect the underlying RPC proxy to the daemon.<br/>
|
||||||
* @throws IOException
|
* @throws IOException in case of communication errors
|
||||||
*/
|
*/
|
||||||
public abstract void disconnect() throws IOException;
|
public abstract void disconnect() throws IOException;
|
||||||
|
|
||||||
|
@ -153,6 +162,92 @@ public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
|
||||||
return getProxy().getProcessInfo();
|
return getProxy().getProcessInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract method to retrieve the name of a daemon specific env. var
|
||||||
|
* @return name of Hadoop environment variable containing a daemon options
|
||||||
|
*/
|
||||||
|
abstract public String getHadoopOptsEnvName ();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks remote daemon process info to see if certain JMX sys. properties
|
||||||
|
* are available and reckon if the JMX service is enabled on the remote side
|
||||||
|
*
|
||||||
|
* @return <code>boolean</code> code indicating availability of remote JMX
|
||||||
|
* @throws IOException is throws in case of communication errors
|
||||||
|
*/
|
||||||
|
public boolean isJmxEnabled() throws IOException {
|
||||||
|
return isJmxEnabled(HADOOP_OPTS_ENV) ||
|
||||||
|
isJmxEnabled(getHadoopOptsEnvName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks remote daemon process info to see if certain JMX sys. properties
|
||||||
|
* are available and reckon if the JMX service is enabled on the remote side
|
||||||
|
*
|
||||||
|
* @param envivar name of an evironment variable to be searched
|
||||||
|
* @return <code>boolean</code> code indicating availability of remote JMX
|
||||||
|
* @throws IOException is throws in case of communication errors
|
||||||
|
*/
|
||||||
|
protected boolean isJmxEnabled(String envivar) throws IOException {
|
||||||
|
if (jmxEnabled != null) return jmxEnabled;
|
||||||
|
boolean ret = false;
|
||||||
|
String jmxRemoteString = "-Dcom.sun.management.jmxremote";
|
||||||
|
String hadoopOpts = getProcessInfo().getEnv().get(envivar);
|
||||||
|
LOG.debug("Looking into " + hadoopOpts + " from " + envivar);
|
||||||
|
List<String> options = Arrays.asList(hadoopOpts.split(" "));
|
||||||
|
ret = options.contains(jmxRemoteString);
|
||||||
|
jmxEnabled = ret;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks remote daemon process info to find remote JMX server port number
|
||||||
|
* By default this method will look into "HADOOP_OPTS" variable only.
|
||||||
|
* @return number of remote JMX server or -1 if it can't be found
|
||||||
|
* @throws IOException is throws in case of communication errors
|
||||||
|
* @throws IllegalArgumentException if non-integer port is set
|
||||||
|
* in the remote process info
|
||||||
|
*/
|
||||||
|
public int getJmxPortNumber() throws IOException, IllegalArgumentException {
|
||||||
|
int portNo = getJmxPortNumber(HADOOP_OPTS_ENV);
|
||||||
|
return portNo != -1 ? portNo : getJmxPortNumber(getHadoopOptsEnvName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks remote daemon process info to find remote JMX server port number
|
||||||
|
*
|
||||||
|
* @param envivar name of the env. var. to look for JMX specific settings
|
||||||
|
* @return number of remote JMX server or -1 if it can't be found
|
||||||
|
* @throws IOException is throws in case of communication errors
|
||||||
|
* @throws IllegalArgumentException if non-integer port is set
|
||||||
|
* in the remote process info
|
||||||
|
*/
|
||||||
|
protected int getJmxPortNumber(final String envivar) throws
|
||||||
|
IOException, IllegalArgumentException {
|
||||||
|
if (jmxPortNumber != -1) return jmxPortNumber;
|
||||||
|
String jmxPortString = "-Dcom.sun.management.jmxremote.port";
|
||||||
|
|
||||||
|
String hadoopOpts = getProcessInfo().getEnv().get(envivar);
|
||||||
|
int portNumber = -1;
|
||||||
|
boolean found = false;
|
||||||
|
String[] options = hadoopOpts.split(" ");
|
||||||
|
for (String option : options) {
|
||||||
|
if (option.startsWith(jmxPortString)) {
|
||||||
|
found = true;
|
||||||
|
try {
|
||||||
|
portNumber = Integer.parseInt(option.split("=")[1]);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw new IllegalArgumentException("JMX port number isn't integer");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found)
|
||||||
|
throw new IllegalArgumentException("Can't detect JMX port number");
|
||||||
|
jmxPortNumber = portNumber;
|
||||||
|
return jmxPortNumber;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a file status object that represents the path.
|
* Return a file status object that represents the path.
|
||||||
* @param path
|
* @param path
|
||||||
|
@ -160,8 +255,7 @@ public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
|
||||||
* @param local
|
* @param local
|
||||||
* whether the path is local or not
|
* whether the path is local or not
|
||||||
* @return a FileStatus object
|
* @return a FileStatus object
|
||||||
* @throws java.io.FileNotFoundException when the path does not exist;
|
* @throws IOException see specific implementation
|
||||||
* IOException see specific implementation
|
|
||||||
*/
|
*/
|
||||||
public FileStatus getFileStatus(String path, boolean local) throws IOException {
|
public FileStatus getFileStatus(String path, boolean local) throws IOException {
|
||||||
return getProxy().getFileStatus(path, local);
|
return getProxy().getFileStatus(path, local);
|
||||||
|
@ -275,7 +369,7 @@ public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
|
||||||
* Pattern used for searching is FATAL. <br/>
|
* Pattern used for searching is FATAL. <br/>
|
||||||
* @param excludeExpList list of exception to exclude
|
* @param excludeExpList list of exception to exclude
|
||||||
* @return number of occurrence of fatal message.
|
* @return number of occurrence of fatal message.
|
||||||
* @throws IOException
|
* @throws IOException in case of communication errors
|
||||||
*/
|
*/
|
||||||
public int getNumberOfFatalStatementsInLog(String [] excludeExpList)
|
public int getNumberOfFatalStatementsInLog(String [] excludeExpList)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -391,4 +485,115 @@ public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
|
||||||
"New ConcurrentModificationException in log file",
|
"New ConcurrentModificationException in log file",
|
||||||
concurrentExceptionCount, newconcurrentExceptionCount);
|
concurrentExceptionCount, newconcurrentExceptionCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds correct name of JMX object name from given domain, service name, type
|
||||||
|
* @param domain JMX domain name
|
||||||
|
* @param serviceName of the service where MBean is registered (NameNode)
|
||||||
|
* @param typeName of the MXBean class
|
||||||
|
* @return ObjectName for requested MXBean of <code>null</code> if one wasn't
|
||||||
|
* found
|
||||||
|
* @throws java.io.IOException in if object name is malformed
|
||||||
|
*/
|
||||||
|
protected ObjectName getJmxBeanName(String domain, String serviceName,
|
||||||
|
String typeName) throws IOException {
|
||||||
|
if (domain == null)
|
||||||
|
domain = HADOOP_JMX_DOMAIN;
|
||||||
|
|
||||||
|
ObjectName jmxBean;
|
||||||
|
try {
|
||||||
|
jmxBean = new ObjectName(domain + ":service=" + serviceName +
|
||||||
|
",name=" + typeName);
|
||||||
|
} catch (MalformedObjectNameException e) {
|
||||||
|
LOG.debug(e.getStackTrace());
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
return jmxBean;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create connection with the remote JMX server at given host and port
|
||||||
|
* @param host name of the remote JMX server host
|
||||||
|
* @param port port number of the remote JXM server host
|
||||||
|
* @return instance of MBeanServerConnection or <code>null</code> if one
|
||||||
|
* hasn't been established
|
||||||
|
* @throws IOException in case of comminication errors
|
||||||
|
*/
|
||||||
|
protected MBeanServerConnection establishJmxConnection(String host, int port)
|
||||||
|
throws IOException {
|
||||||
|
if (connection != null) return connection;
|
||||||
|
String urlPattern = null;
|
||||||
|
try {
|
||||||
|
urlPattern = "service:jmx:rmi:///jndi/rmi://" +
|
||||||
|
host + ":" + port +
|
||||||
|
"/jmxrmi";
|
||||||
|
JMXServiceURL url = new JMXServiceURL(urlPattern);
|
||||||
|
JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
||||||
|
connection = connector.getMBeanServerConnection();
|
||||||
|
} catch (java.net.MalformedURLException badURLExc) {
|
||||||
|
LOG.debug("bad url: " + urlPattern, badURLExc);
|
||||||
|
throw new IOException(badURLExc);
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
Hashtable<String, ObjectName> jmxObjectNames =
|
||||||
|
new Hashtable<String, ObjectName>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method implements all logic for receiving a bean's attribute.
|
||||||
|
* If any initializations such as establishing bean server connections, etc.
|
||||||
|
* are need it will do it.
|
||||||
|
* @param serviceName name of the service where MBean is registered (NameNode)
|
||||||
|
* @param type name of the MXBean class
|
||||||
|
* @param attributeName name of the attribute to be retrieved
|
||||||
|
* @return Object value of the attribute or <code>null</code> if not found
|
||||||
|
* @throws IOException is thrown in case of any errors
|
||||||
|
*/
|
||||||
|
protected Object getJmxAttribute (String serviceName,
|
||||||
|
String type,
|
||||||
|
String attributeName)
|
||||||
|
throws IOException {
|
||||||
|
Object retAttribute = null;
|
||||||
|
String domain = null;
|
||||||
|
if (isJmxEnabled()) {
|
||||||
|
try {
|
||||||
|
MBeanServerConnection conn =
|
||||||
|
establishJmxConnection(getHostName(),
|
||||||
|
getJmxPortNumber(HADOOP_OPTS_ENV));
|
||||||
|
for (String d : conn.getDomains()) {
|
||||||
|
if (d != null && d.startsWith(HADOOP_JMX_DOMAIN))
|
||||||
|
domain = d;
|
||||||
|
}
|
||||||
|
if (!jmxObjectNames.containsKey(type))
|
||||||
|
jmxObjectNames.put(type, getJmxBeanName(domain, serviceName, type));
|
||||||
|
retAttribute =
|
||||||
|
conn.getAttribute(jmxObjectNames.get(type), attributeName);
|
||||||
|
} catch (MBeanException e) {
|
||||||
|
LOG.debug(e.getStackTrace());
|
||||||
|
throw new IOException(e);
|
||||||
|
} catch (AttributeNotFoundException e) {
|
||||||
|
LOG.warn(e.getStackTrace());
|
||||||
|
throw new IOException(e);
|
||||||
|
} catch (InstanceNotFoundException e) {
|
||||||
|
LOG.warn(e.getStackTrace());
|
||||||
|
throw new IOException(e);
|
||||||
|
} catch (ReflectionException e) {
|
||||||
|
LOG.debug(e.getStackTrace());
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return retAttribute;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method has to be implemented by appropriate concrete daemon client
|
||||||
|
* e.g. DNClient, NNClient, etc.
|
||||||
|
* Concrete implementation has to provide names of the service and bean type
|
||||||
|
* @param attributeName name of the attribute to be retrieved
|
||||||
|
* @return Object value of the given attribute
|
||||||
|
* @throws IOException is thrown in case of communication errors
|
||||||
|
*/
|
||||||
|
public abstract Object getDaemonAttribute (String attributeName)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue