HDFS-2179. Add fencing framework and mechanisms for NameNode HA. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1153939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-08-04 17:24:57 +00:00
parent 57213dbcb5
commit 05c24937cf
12 changed files with 1264 additions and 0 deletions

View File

@ -3,3 +3,5 @@ Changes for HDFS-1623 branch.
This change list will be merged into the trunk CHANGES.txt when the HDFS-1623
branch is merged.
------------------------------
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

View File

@ -74,6 +74,7 @@
</dependency>
<dependency org="com.google.guava" name="guava" rev="${guava.version}" conf="hdfs->default" />
<dependency org="com.google.protobuf" name="protobuf-java" rev="2.4.0a" conf="common->master"/>
<dependency org="com.jcraft" name="jsch" rev="${jsch.version}" conf="hdfs->default" />
<dependency org="org.apache.hadoop" name="avro" rev="${avro.version}" conf="compile->master">
<exclude module="ant"/>
<exclude module="jetty"/>

View File

@ -45,6 +45,7 @@ ivy.version=2.1.0
jasper.version=5.5.12
jdeb.version=0.8
jsch.version=0.1.42
jsp.version=2.1
jsp-api.version=5.5.12
jetty.version=6.1.14

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
/**
* Indicates that the operator has specified an invalid configuration
* for fencing methods.
*/
class BadFencingConfigurationException extends IOException {
private static final long serialVersionUID = 1L;
public BadFencingConfigurationException(String msg) {
super(msg);
}
public BadFencingConfigurationException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
/**
* A fencing method is a method by which one node can forcibly prevent
* another node from making continued progress. This might be implemented
* by killing a process on the other node, by denying the other node's
* access to shared storage, or by accessing a PDU to cut the other node's
* power.
* <p>
* Since these methods are often vendor- or device-specific, operators
* may implement this interface in order to achieve fencing.
* <p>
* Fencing is configured by the operator as an ordered list of methods to
* attempt. Each method will be tried in turn, and the next in the list
* will only be attempted if the previous one fails. See {@link NodeFencer}
* for more information.
* <p>
* If an implementation also implements {@link Configurable} then its
* <code>setConf</code> method will be called upon instantiation.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface FenceMethod {
/**
* Verify that the given fencing method's arguments are valid.
* @param args the arguments provided in the configuration. This may
* be null if the operator did not configure any arguments.
* @throws BadFencingConfigurationException if the arguments are invalid
*/
public void checkArgs(String args) throws BadFencingConfigurationException;
/**
* Attempt to fence the target node.
* @param args the configured arguments, which were checked at startup by
* {@link #checkArgs(String)}
* @return true if fencing was successful, false if unsuccessful or
* indeterminate
* @throws BadFencingConfigurationException if the configuration was
* determined to be invalid only at runtime
*/
public boolean tryFence(String args) throws BadFencingConfigurationException;
}

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* This class parses the configured list of fencing methods, and
* is responsible for trying each one in turn while logging informative
* output.<p>
*
* The fencing methods are configured as a carriage-return separated list.
* Each line in the list is of the form:<p>
* <code>com.example.foo.MyMethod(arg string)</code>
* or
* <code>com.example.foo.MyMethod</code>
* The class provided must implement the {@link FenceMethod} interface.
* The fencing methods that ship with Hadoop may also be referred to
* by shortened names:<p>
* <ul>
* <li><code>shell(/path/to/some/script.sh args...)</code></li>
* <li><code>sshfence(...)</code> (see {@link SshFenceByTcpPort})
* </ul>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class NodeFencer {
static final String CONF_METHODS_KEY =
"dfs.namenode.ha.fencing.methods";
private static final String CLASS_RE = "([a-zA-Z0-9\\.\\$]+)";
private static final Pattern CLASS_WITH_ARGUMENT =
Pattern.compile(CLASS_RE + "\\((.+?)\\)");
private static final Pattern CLASS_WITHOUT_ARGUMENT =
Pattern.compile(CLASS_RE);
private static final Pattern HASH_COMMENT_RE =
Pattern.compile("#.*$");
private static final Log LOG = LogFactory.getLog(NodeFencer.class);
/**
* Standard fencing methods included with HDFS.
*/
private static final Map<String, Class<? extends FenceMethod>> STANDARD_METHODS =
ImmutableMap.<String, Class<? extends FenceMethod>>of(
"shell", ShellCommandFencer.class,
"sshfence", SshFenceByTcpPort.class);
private final List<FenceMethodWithArg> methods;
public NodeFencer(Configuration conf)
throws BadFencingConfigurationException {
this.methods = parseMethods(conf);
}
public boolean fence() {
LOG.info("====== Beginning NameNode Fencing Process... ======");
int i = 0;
for (FenceMethodWithArg method : methods) {
LOG.info("Trying method " + (++i) + "/" + methods.size() +": " + method);
try {
if (method.method.tryFence(method.arg)) {
LOG.info("====== Fencing successful by method " + method + " ======");
return true;
}
} catch (BadFencingConfigurationException e) {
LOG.error("Fencing method " + method + " misconfigured", e);
continue;
} catch (Throwable t) {
LOG.error("Fencing method " + method + " failed with an unexpected error.", t);
continue;
}
LOG.warn("Fencing method " + method + " was unsuccessful.");
}
LOG.error("Unable to fence NameNode by any configured method.");
return false;
}
private static List<FenceMethodWithArg> parseMethods(Configuration conf)
throws BadFencingConfigurationException {
String confStr = conf.get(CONF_METHODS_KEY);
String[] lines = confStr.split("\\s*\n\\s*");
List<FenceMethodWithArg> methods = Lists.newArrayList();
for (String line : lines) {
line = HASH_COMMENT_RE.matcher(line).replaceAll("");
line = line.trim();
if (!line.isEmpty()) {
methods.add(parseMethod(conf, line));
}
}
return methods;
}
private static FenceMethodWithArg parseMethod(Configuration conf, String line)
throws BadFencingConfigurationException {
Matcher m;
if ((m = CLASS_WITH_ARGUMENT.matcher(line)).matches()) {
String className = m.group(1);
String arg = m.group(2);
return createFenceMethod(conf, className, arg);
} else if ((m = CLASS_WITHOUT_ARGUMENT.matcher(line)).matches()) {
String className = m.group(1);
return createFenceMethod(conf, className, null);
} else {
throw new BadFencingConfigurationException(
"Unable to parse line: '" + line + "'");
}
}
private static FenceMethodWithArg createFenceMethod(
Configuration conf, String clazzName, String arg)
throws BadFencingConfigurationException {
Class<?> clazz;
try {
// See if it's a short name for one of the built-in methods
clazz = STANDARD_METHODS.get(clazzName);
if (clazz == null) {
// Try to instantiate the user's custom method
clazz = Class.forName(clazzName);
}
} catch (Exception e) {
throw new BadFencingConfigurationException(
"Could not find configured fencing method " + clazzName,
e);
}
// Check that it implements the right interface
if (!FenceMethod.class.isAssignableFrom(clazz)) {
throw new BadFencingConfigurationException("Class " + clazzName +
" does not implement FenceMethod");
}
FenceMethod method = (FenceMethod)ReflectionUtils.newInstance(
clazz, conf);
method.checkArgs(arg);
return new FenceMethodWithArg(method, arg);
}
private static class FenceMethodWithArg {
private final FenceMethod method;
private final String arg;
private FenceMethodWithArg(FenceMethod method, String arg) {
this.method = method;
this.arg = arg;
}
public String toString() {
return method.getClass().getCanonicalName() + "(" + arg + ")";
}
}
}

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import com.google.common.annotations.VisibleForTesting;
/**
* Fencing method that runs a shell command. It should be specified
* in the fencing configuration like:<br>
* <code>
* shell(/path/to/my/script.sh arg1 arg2 ...)
* </code><br>
* The string between '(' and ')' is passed directly to a bash shell and
* may not include any closing parentheses.<p>
*
* The shell command will be run with an environment set up to contain
* all of the current Hadoop configuration variables, with the '_' character
* replacing any '.' characters in the configuration keys.<p>
*
* If the shell command returns an exit code of 0, the fencing is
* determined to be successful. If it returns any other exit code, the
* fencing was not successful and the next fencing method in the list
* will be attempted.<p>
*
* <em>Note:</em> this fencing method does not implement any timeout.
* If timeouts are necessary, they should be implemented in the shell
* script itself (eg by forking a subshell to kill its parent in
* some number of seconds).
*/
public class ShellCommandFencer
extends Configured implements FenceMethod {
/** Length at which to abbreviate command in long messages */
private static final int ABBREV_LENGTH = 20;
@VisibleForTesting
static Log LOG = LogFactory.getLog(
ShellCommandFencer.class);
@Override
public void checkArgs(String args) throws BadFencingConfigurationException {
if (args == null || args.isEmpty()) {
throw new BadFencingConfigurationException(
"No argument passed to 'shell' fencing method");
}
// Nothing else we can really check without actually running the command
}
@Override
public boolean tryFence(String cmd) {
ProcessBuilder builder = new ProcessBuilder(
"bash", "-e", "-c", cmd);
setConfAsEnvVars(builder.environment());
Process p;
try {
p = builder.start();
p.getOutputStream().close();
} catch (IOException e) {
LOG.warn("Unable to execute " + cmd, e);
return false;
}
String pid = tryGetPid(p);
LOG.info("Launched fencing command '" + cmd + "' with "
+ ((pid != null) ? ("pid " + pid) : "unknown pid"));
String logPrefix = abbreviate(cmd, ABBREV_LENGTH);
if (pid != null) {
logPrefix = "[PID " + pid + "] " + logPrefix;
}
// Pump logs to stderr
StreamPumper errPumper = new StreamPumper(
LOG, logPrefix, p.getErrorStream(),
StreamPumper.StreamType.STDERR);
errPumper.start();
StreamPumper outPumper = new StreamPumper(
LOG, logPrefix, p.getInputStream(),
StreamPumper.StreamType.STDOUT);
outPumper.start();
int rc;
try {
rc = p.waitFor();
errPumper.join();
outPumper.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted while waiting for fencing command: " + cmd);
return false;
}
return rc == 0;
}
/**
* Abbreviate a string by putting '...' in the middle of it,
* in an attempt to keep logs from getting too messy.
* @param cmd the string to abbreviate
* @param len maximum length to abbreviate to
* @return abbreviated string
*/
static String abbreviate(String cmd, int len) {
if (cmd.length() > len && len >= 5) {
int firstHalf = (len - 3) / 2;
int rem = len - firstHalf - 3;
return cmd.substring(0, firstHalf) +
"..." + cmd.substring(cmd.length() - rem);
} else {
return cmd;
}
}
/**
* Attempt to use evil reflection tricks to determine the
* pid of a launched process. This is helpful to ops
* if debugging a fencing process that might have gone
* wrong. If running on a system or JVM where this doesn't
* work, it will simply return null.
*/
private static String tryGetPid(Process p) {
try {
Class<? extends Process> clazz = p.getClass();
if (clazz.getName().equals("java.lang.UNIXProcess")) {
Field f = clazz.getDeclaredField("pid");
f.setAccessible(true);
return String.valueOf(f.getInt(p));
} else {
LOG.trace("Unable to determine pid for " + p
+ " since it is not a UNIXProcess");
return null;
}
} catch (Throwable t) {
LOG.trace("Unable to determine pid for " + p, t);
return null;
}
}
/**
* Set the environment of the subprocess to be the Configuration,
* with '.'s replaced by '_'s.
*/
private void setConfAsEnvVars(Map<String, String> env) {
for (Map.Entry<String, String> pair : getConf()) {
env.put(pair.getKey().replace('.', '_'), pair.getValue());
}
}
}

View File

@ -0,0 +1,352 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import com.google.common.annotations.VisibleForTesting;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
/**
* This fencing implementation sshes to the target node and uses <code>fuser</code>
* to kill the process listening on the NameNode's TCP port. This is
* more accurate than using "jps" since it doesn't require parsing,
* and will work even if there are multiple NameNodes running on the
* same machine.<p>
* It returns a successful status code if:
* <ul>
* <li><code>fuser</code> indicates it successfully killed a process, <em>or</em>
* <li><code>nc -z</code> indicates that nothing is listening on the target port
* </ul>
* <p>
* This fencing mechanism is configured as following in the fencing method
* list:
* <code>sshfence([username@]nnhost[:ssh-port][, target-nn-port])</code>
* where the first argument specifies the username, host, and port to ssh
* into, and the second argument specifies the port on which the target
* NN process is listening on.
* <p>
* For example, <code>sshfence(other-nn, 8020)<code> will SSH into
* <code>other-nn<code> as the current user on the standard SSH port,
* then kill whatever process is listening on port 8020.
* <p>
* If no <code>target-nn-port</code> is specified, it is assumed that the
* target NameNode is listening on the same port as the local NameNode.
* <p>
* In order to achieve passwordless SSH, the operator must also configure
* <code>dfs.namenode.ha.fencing.ssh.private-key-files<code> to point to an
* SSH key that has passphrase-less access to the given username and host.
*/
public class SshFenceByTcpPort extends Configured
implements FenceMethod {
static final Log LOG = LogFactory.getLog(
SshFenceByTcpPort.class);
static final String CONF_CONNECT_TIMEOUT_KEY =
"dfs.namenode.ha.fencing.ssh.connect-timeout";
private static final int CONF_CONNECT_TIMEOUT_DEFAULT =
30*1000;
static final String CONF_IDENTITIES_KEY =
"dfs.namenode.ha.fencing.ssh.private-key-files";
/**
* Verify that the arguments are parseable and that the host
* can be resolved.
*/
@Override
public void checkArgs(String argStr) throws BadFencingConfigurationException {
Args args = new Args(argStr);
try {
InetAddress.getByName(args.host);
} catch (UnknownHostException e) {
throw new BadFencingConfigurationException(
"Unknown host: " + args.host);
}
}
@Override
public boolean tryFence(String argsStr)
throws BadFencingConfigurationException {
Args args = new Args(argsStr);
Session session;
try {
session = createSession(args);
} catch (JSchException e) {
LOG.warn("Unable to create SSH session", e);
return false;
}
LOG.info("Connecting to " + args.host + "...");
try {
session.connect(getSshConnectTimeout());
} catch (JSchException e) {
LOG.warn("Unable to connect to " + args.host
+ " as user " + args.user, e);
return false;
}
LOG.info("Connected to " + args.host);
int targetPort = args.targetPort != null ?
args.targetPort : getDefaultNNPort();
try {
return doFence(session, targetPort);
} catch (JSchException e) {
LOG.warn("Unable to achieve fencing on remote host", e);
return false;
} finally {
session.disconnect();
}
}
private Session createSession(Args args) throws JSchException {
JSch jsch = new JSch();
for (String keyFile : getKeyFiles()) {
jsch.addIdentity(keyFile);
}
JSch.setLogger(new LogAdapter());
Session session = jsch.getSession(args.user, args.host, args.sshPort);
session.setConfig("StrictHostKeyChecking", "no");
return session;
}
private boolean doFence(Session session, int nnPort) throws JSchException {
try {
LOG.info("Looking for process running on port " + nnPort);
int rc = execCommand(session,
"PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp " + nnPort);
if (rc == 0) {
LOG.info("Successfully killed process that was " +
"listening on port " + nnPort);
// exit code 0 indicates the process was successfully killed.
return true;
} else if (rc == 1) {
// exit code 1 indicates either that the process was not running
// or that fuser didn't have root privileges in order to find it
// (eg running as a different user)
LOG.info(
"Indeterminate response from trying to kill NameNode. " +
"Verifying whether it is running using nc...");
rc = execCommand(session, "nc -z localhost 8020");
if (rc == 0) {
// the NN is still listening - we are unable to fence
LOG.warn("Unable to fence NN - it is running but we cannot kill it");
return false;
} else {
LOG.info("Verified that the NN is down.");
return true;
}
} else {
// other
}
LOG.info("rc: " + rc);
return rc == 0;
} catch (InterruptedException e) {
LOG.warn("Interrupted while trying to fence via ssh", e);
return false;
} catch (IOException e) {
LOG.warn("Unknown failure while trying to fence via ssh", e);
return false;
}
}
/**
* Execute a command through the ssh session, pumping its
* stderr and stdout to our own logs.
*/
private int execCommand(Session session, String cmd)
throws JSchException, InterruptedException, IOException {
LOG.debug("Running cmd: " + cmd);
ChannelExec exec = null;
try {
exec = (ChannelExec)session.openChannel("exec");
exec.setCommand(cmd);
exec.setInputStream(null);
exec.connect();
// Pump stdout of the command to our WARN logs
StreamPumper outPumper = new StreamPumper(LOG, cmd + " via ssh",
exec.getInputStream(), StreamPumper.StreamType.STDOUT);
outPumper.start();
// Pump stderr of the command to our WARN logs
StreamPumper errPumper = new StreamPumper(LOG, cmd + " via ssh",
exec.getErrStream(), StreamPumper.StreamType.STDERR);
errPumper.start();
outPumper.join();
errPumper.join();
return exec.getExitStatus();
} finally {
cleanup(exec);
}
}
private static void cleanup(ChannelExec exec) {
if (exec != null) {
try {
exec.disconnect();
} catch (Throwable t) {
LOG.warn("Couldn't disconnect ssh channel", t);
}
}
}
private int getSshConnectTimeout() {
return getConf().getInt(
CONF_CONNECT_TIMEOUT_KEY, CONF_CONNECT_TIMEOUT_DEFAULT);
}
private Collection<String> getKeyFiles() {
return getConf().getTrimmedStringCollection(CONF_IDENTITIES_KEY);
}
private int getDefaultNNPort() {
return NameNode.getAddress(getConf()).getPort();
}
/**
* Container for the parsed arg line for this fencing method.
*/
@VisibleForTesting
static class Args {
private static final Pattern USER_HOST_PORT_RE = Pattern.compile(
"(?:(.+?)@)?([^:]+?)(?:\\:(\\d+))?");
private static final int DEFAULT_SSH_PORT = 22;
final String user;
final String host;
final int sshPort;
final Integer targetPort;
public Args(String args) throws BadFencingConfigurationException {
if (args == null) {
throw new BadFencingConfigurationException(
"Must specify args for ssh fencing configuration");
}
String[] argList = args.split(",\\s*");
if (argList.length > 2 || argList.length == 0) {
throw new BadFencingConfigurationException(
"Incorrect number of arguments: " + args);
}
// Parse SSH destination.
String sshDestArg = argList[0];
Matcher m = USER_HOST_PORT_RE.matcher(sshDestArg);
if (!m.matches()) {
throw new BadFencingConfigurationException(
"Unable to parse SSH destination: "+ sshDestArg);
}
if (m.group(1) != null) {
user = m.group(1);
} else {
user = System.getProperty("user.name");
}
host = m.group(2);
if (m.group(3) != null) {
sshPort = parseConfiggedPort(m.group(3));
} else {
sshPort = DEFAULT_SSH_PORT;
}
// Parse target port.
if (argList.length > 1) {
targetPort = parseConfiggedPort(argList[1]);
} else {
targetPort = null;
}
}
private Integer parseConfiggedPort(String portStr)
throws BadFencingConfigurationException {
try {
return Integer.valueOf(portStr);
} catch (NumberFormatException nfe) {
throw new BadFencingConfigurationException(
"Port number '" + portStr + "' invalid");
}
}
}
/**
* Adapter from JSch's logger interface to our log4j
*/
private static class LogAdapter implements com.jcraft.jsch.Logger {
static final Log LOG = LogFactory.getLog(
SshFenceByTcpPort.class.getName() + ".jsch");
public boolean isEnabled(int level) {
switch (level) {
case com.jcraft.jsch.Logger.DEBUG:
return LOG.isDebugEnabled();
case com.jcraft.jsch.Logger.INFO:
return LOG.isInfoEnabled();
case com.jcraft.jsch.Logger.WARN:
return LOG.isWarnEnabled();
case com.jcraft.jsch.Logger.ERROR:
return LOG.isErrorEnabled();
case com.jcraft.jsch.Logger.FATAL:
return LOG.isFatalEnabled();
default:
return false;
}
}
public void log(int level, String message) {
switch (level) {
case com.jcraft.jsch.Logger.DEBUG:
LOG.debug(message);
break;
case com.jcraft.jsch.Logger.INFO:
LOG.info(message);
break;
case com.jcraft.jsch.Logger.WARN:
LOG.warn(message);
break;
case com.jcraft.jsch.Logger.ERROR:
LOG.error(message);
break;
case com.jcraft.jsch.Logger.FATAL:
LOG.fatal(message);
break;
}
}
}
}

View File

@ -0,0 +1,73 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.commons.logging.Log;
/**
* Class responsible for pumping the streams of the subprocess
* out to log4j. stderr is pumped to WARN level and stdout is
* pumped to INFO level
*/
class StreamPumper {
enum StreamType {
STDOUT, STDERR;
}
private final Log log;
final Thread thread;
final String logPrefix;
final StreamPumper.StreamType type;
private final InputStream stream;
private boolean started = false;
StreamPumper(final Log log, final String logPrefix,
final InputStream stream, final StreamType type) {
this.log = log;
this.logPrefix = logPrefix;
this.stream = stream;
this.type = type;
thread = new Thread(new Runnable() {
@Override
public void run() {
try {
pump();
} catch (Throwable t) {
ShellCommandFencer.LOG.warn(logPrefix +
": Unable to pump output from " + type,
t);
}
}
}, logPrefix + ": StreamPumper for " + type);
thread.setDaemon(true);
}
void join() throws InterruptedException {
assert started;
thread.join();
}
void start() {
assert !started;
thread.start();
started = true;
}
protected void pump() throws IOException {
InputStreamReader inputStreamReader = new InputStreamReader(stream);
BufferedReader br = new BufferedReader(inputStreamReader);
String line = null;
while ((line = br.readLine()) != null) {
if (type == StreamType.STDOUT) {
log.info(logPrefix + ": " + line);
} else {
log.warn(logPrefix + ": " + line);
}
}
}
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestNodeFencer {
@Before
public void clearMockState() {
AlwaysSucceedFencer.fenceCalled = 0;
AlwaysSucceedFencer.callArgs.clear();
AlwaysFailFencer.fenceCalled = 0;
AlwaysFailFencer.callArgs.clear();
}
@Test
public void testSingleFencer() throws BadFencingConfigurationException {
NodeFencer fencer = setupFencer(
AlwaysSucceedFencer.class.getName() + "(foo)");
assertTrue(fencer.fence());
assertEquals(1, AlwaysSucceedFencer.fenceCalled);
assertEquals("foo", AlwaysSucceedFencer.callArgs.get(0));
}
@Test
public void testMultipleFencers() throws BadFencingConfigurationException {
NodeFencer fencer = setupFencer(
AlwaysSucceedFencer.class.getName() + "(foo)\n" +
AlwaysSucceedFencer.class.getName() + "(bar)\n");
assertTrue(fencer.fence());
// Only one call, since the first fencer succeeds
assertEquals(1, AlwaysSucceedFencer.fenceCalled);
assertEquals("foo", AlwaysSucceedFencer.callArgs.get(0));
}
@Test
public void testWhitespaceAndCommentsInConfig()
throws BadFencingConfigurationException {
NodeFencer fencer = setupFencer(
"\n" +
" # the next one will always fail\n" +
" " + AlwaysFailFencer.class.getName() + "(foo) # <- fails\n" +
AlwaysSucceedFencer.class.getName() + "(bar) \n");
assertTrue(fencer.fence());
// One call to each, since top fencer fails
assertEquals(1, AlwaysFailFencer.fenceCalled);
assertEquals(1, AlwaysSucceedFencer.fenceCalled);
assertEquals("foo", AlwaysFailFencer.callArgs.get(0));
assertEquals("bar", AlwaysSucceedFencer.callArgs.get(0));
}
@Test
public void testArglessFencer() throws BadFencingConfigurationException {
NodeFencer fencer = setupFencer(
AlwaysSucceedFencer.class.getName());
assertTrue(fencer.fence());
// One call to each, since top fencer fails
assertEquals(1, AlwaysSucceedFencer.fenceCalled);
assertEquals(null, AlwaysSucceedFencer.callArgs.get(0));
}
@Test
public void testShortName() throws BadFencingConfigurationException {
NodeFencer fencer = setupFencer("shell(true)");
assertTrue(fencer.fence());
}
private NodeFencer setupFencer(String confStr)
throws BadFencingConfigurationException {
System.err.println("Testing configuration:\n" + confStr);
Configuration conf = new Configuration();
conf.set(NodeFencer.CONF_METHODS_KEY,
confStr);
return new NodeFencer(conf);
}
/**
* Mock fencing method that always returns true
*/
public static class AlwaysSucceedFencer extends Configured
implements FenceMethod {
static int fenceCalled = 0;
static List<String> callArgs = Lists.newArrayList();
@Override
public boolean tryFence(String args) {
callArgs.add(args);
fenceCalled++;
return true;
}
@Override
public void checkArgs(String args) {
}
}
/**
* Identical mock to above, except always returns false
*/
public static class AlwaysFailFencer extends Configured
implements FenceMethod {
static int fenceCalled = 0;
static List<String> callArgs = Lists.newArrayList();
@Override
public boolean tryFence(String args) {
callArgs.add(args);
fenceCalled++;
return false;
}
@Override
public void checkArgs(String args) {
}
}
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import static org.mockito.Mockito.spy;
public class TestShellCommandFencer {
private ShellCommandFencer fencer = createFencer();
@BeforeClass
public static void setupLogSpy() {
ShellCommandFencer.LOG = spy(ShellCommandFencer.LOG);
}
@Before
public void resetLogSpy() {
Mockito.reset(ShellCommandFencer.LOG);
}
private static ShellCommandFencer createFencer() {
Configuration conf = new Configuration();
conf.set("in.fencing.tests", "yessir");
ShellCommandFencer fencer = new ShellCommandFencer();
fencer.setConf(conf);
return fencer;
}
/**
* Test that the exit code of the script determines
* whether the fencer succeeded or failed
*/
@Test
public void testBasicSuccessFailure() {
assertTrue(fencer.tryFence("exit 0"));
assertFalse(fencer.tryFence("exit 1"));
// bad path should also fail
assertFalse(fencer.tryFence("xxxxxxxxxxxx"));
}
@Test
public void testCheckArgs() {
try {
Configuration conf = new Configuration();
conf.set(NodeFencer.CONF_METHODS_KEY, "shell");
new NodeFencer(conf);
fail("Didn't throw when passing no args to shell");
} catch (BadFencingConfigurationException confe) {
GenericTestUtils.assertExceptionContains(
"No argument passed", confe);
}
}
/**
* Test that lines on stdout get passed as INFO
* level messages
*/
@Test
public void testStdoutLogging() {
assertTrue(fencer.tryFence("echo hello"));
Mockito.verify(ShellCommandFencer.LOG).info(
Mockito.endsWith("echo hello: hello"));
}
/**
* Test that lines on stderr get passed as
* WARN level log messages
*/
@Test
public void testStderrLogging() {
assertTrue(fencer.tryFence("echo hello >&2"));
Mockito.verify(ShellCommandFencer.LOG).warn(
Mockito.endsWith("echo hello >&2: hello"));
}
/**
* Verify that the Configuration gets passed as
* environment variables to the fencer.
*/
@Test
public void testConfAsEnvironment() {
fencer.tryFence("echo $in_fencing_tests");
Mockito.verify(ShellCommandFencer.LOG).info(
Mockito.endsWith("echo $in...ing_tests: yessir"));
}
/**
* Test that we properly close off our input to the subprocess
* such that it knows there's no tty connected. This is important
* so that, if we use 'ssh', it won't try to prompt for a password
* and block forever, for example.
*/
@Test(timeout=10000)
public void testSubprocessInputIsClosed() {
assertFalse(fencer.tryFence("read"));
}
@Test
public void testCommandAbbreviation() {
assertEquals("a...f", ShellCommandFencer.abbreviate("abcdef", 5));
assertEquals("abcdef", ShellCommandFencer.abbreviate("abcdef", 6));
assertEquals("abcdef", ShellCommandFencer.abbreviate("abcdef", 7));
assertEquals("a...g", ShellCommandFencer.abbreviate("abcdefg", 5));
assertEquals("a...h", ShellCommandFencer.abbreviate("abcdefgh", 5));
assertEquals("a...gh", ShellCommandFencer.abbreviate("abcdefgh", 6));
assertEquals("ab...gh", ShellCommandFencer.abbreviate("abcdefgh", 7));
}
}

View File

@ -0,0 +1,102 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.server.namenode.ha.SshFenceByTcpPort.Args;
import org.apache.log4j.Level;
import org.junit.Assume;
import org.junit.Test;
public class TestSshFenceByTcpPort {
static {
((Log4JLogger)SshFenceByTcpPort.LOG).getLogger().setLevel(Level.ALL);
}
private String TEST_FENCING_ARG = System.getProperty(
"test.TestSshFenceByTcpPort.arg", "localhost");
private final String TEST_KEYFILE = System.getProperty(
"test.TestSshFenceByTcpPort.key");
@Test(timeout=20000)
public void testFence() throws BadFencingConfigurationException {
Assume.assumeTrue(isConfigured());
Configuration conf = new Configuration();
conf.set(SshFenceByTcpPort.CONF_IDENTITIES_KEY, TEST_KEYFILE);
FileSystem.setDefaultUri(conf, "localhost:8020");
SshFenceByTcpPort fence = new SshFenceByTcpPort();
fence.setConf(conf);
assertTrue(fence.tryFence(TEST_FENCING_ARG));
}
/**
* Test connecting to a host which definitely won't respond.
* Make sure that it times out and returns false, but doesn't throw
* any exception
*/
@Test(timeout=20000)
public void testConnectTimeout() throws BadFencingConfigurationException {
Configuration conf = new Configuration();
conf.setInt(SshFenceByTcpPort.CONF_CONNECT_TIMEOUT_KEY, 3000);
SshFenceByTcpPort fence = new SshFenceByTcpPort();
fence.setConf(conf);
// Connect to Google's DNS server - not running ssh!
assertFalse(fence.tryFence("8.8.8.8"));
}
@Test
public void testArgsParsing() throws BadFencingConfigurationException {
Args args = new SshFenceByTcpPort.Args("foo@bar.com:1234");
assertEquals("foo", args.user);
assertEquals("bar.com", args.host);
assertEquals(1234, args.sshPort);
assertNull(args.targetPort);
args = new SshFenceByTcpPort.Args("foo@bar.com");
assertEquals("foo", args.user);
assertEquals("bar.com", args.host);
assertEquals(22, args.sshPort);
assertNull(args.targetPort);
args = new SshFenceByTcpPort.Args("bar.com");
assertEquals(System.getProperty("user.name"), args.user);
assertEquals("bar.com", args.host);
assertEquals(22, args.sshPort);
assertNull(args.targetPort);
args = new SshFenceByTcpPort.Args("bar.com:1234, 12345");
assertEquals(System.getProperty("user.name"), args.user);
assertEquals("bar.com", args.host);
assertEquals(1234, args.sshPort);
assertEquals(Integer.valueOf(12345), args.targetPort);
args = new SshFenceByTcpPort.Args("bar, 8020");
assertEquals(Integer.valueOf(8020), args.targetPort);
}
@Test
public void testBadArgsParsing() throws BadFencingConfigurationException {
assertBadArgs(null);
assertBadArgs("");
assertBadArgs("bar.com:");
assertBadArgs("bar.com:x");
assertBadArgs("foo.com, x");
}
private void assertBadArgs(String argStr) {
try {
new Args(argStr);
fail("Did not fail on bad args: " + argStr);
} catch (BadFencingConfigurationException e) {
// expected
}
}
private boolean isConfigured() {
return (TEST_FENCING_ARG != null && !TEST_FENCING_ARG.isEmpty()) &&
(TEST_KEYFILE != null && !TEST_KEYFILE.isEmpty());
}
}