HADOOP-8279. Allow manual failover to be invoked when auto-failover is enabled. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1333288 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-03 01:56:33 +00:00
parent c6e1321245
commit 9d5799553f
26 changed files with 1409 additions and 91 deletions

View File

@ -21,3 +21,5 @@ HADOOP-8246. Auto-HA: automatically scope znode by nameservice ID (todd)
HADOOP-8247. Add a config to enable auto-HA, which disables manual FailoverController (todd)
HADOOP-8306. ZKFC: improve error message when ZK is not running. (todd)
HADOOP-8279. Allow manual failover to be invoked when auto-failover is enabled. (todd)

View File

@ -116,6 +116,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"security.refresh.user.mappings.protocol.acl";
public static final String
SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
public static final String
SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl";
public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
"hadoop.security.token.service.use_ip";

View File

@ -378,7 +378,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
createConnection();
}
Stat stat = new Stat();
return zkClient.getData(zkLockFilePath, false, stat);
return getDataWithRetries(zkLockFilePath, false, stat);
} catch(KeeperException e) {
Code code = e.code();
if (isNodeDoesNotExist(code)) {
@ -889,6 +889,15 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
});
}
private byte[] getDataWithRetries(final String path, final boolean watch,
final Stat stat) throws InterruptedException, KeeperException {
return zkDoWithRetries(new ZKAction<byte[]>() {
public byte[] run() throws KeeperException, InterruptedException {
return zkClient.getData(path, watch, stat);
}
});
}
private Stat setDataWithRetries(final String path, final byte[] data,
final int version) throws InterruptedException, KeeperException {
return zkDoWithRetries(new ZKAction<Stat>() {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
/**
@ -201,9 +202,26 @@ public abstract class HAAdmin extends Configured implements Tool {
HAServiceTarget fromNode = resolveTarget(args[0]);
HAServiceTarget toNode = resolveTarget(args[1]);
if (!checkManualStateManagementOK(fromNode) ||
!checkManualStateManagementOK(toNode)) {
return -1;
// Check that auto-failover is consistently configured for both nodes.
Preconditions.checkState(
fromNode.isAutoFailoverEnabled() ==
toNode.isAutoFailoverEnabled(),
"Inconsistent auto-failover configs between %s and %s!",
fromNode, toNode);
if (fromNode.isAutoFailoverEnabled()) {
if (forceFence || forceActive) {
// -forceActive doesn't make sense with auto-HA, since, if the node
// is not healthy, then its ZKFC will immediately quit the election
// again the next time a health check runs.
//
// -forceFence doesn't seem to have any real use cases with auto-HA
// so it isn't implemented.
errOut.println(FORCEFENCE + " and " + FORCEACTIVE + " flags not " +
"supported with auto-failover enabled.");
return -1;
}
return gracefulFailoverThroughZKFCs(toNode);
}
FailoverController fc = new FailoverController(getConf(),
@ -218,6 +236,31 @@ public abstract class HAAdmin extends Configured implements Tool {
}
return 0;
}
/**
* Initiate a graceful failover by talking to the target node's ZKFC.
* This sends an RPC to the ZKFC, which coordinates the failover.
*
* @param toNode the node to fail to
* @return status code (0 for success)
* @throws IOException if failover does not succeed
*/
private int gracefulFailoverThroughZKFCs(HAServiceTarget toNode)
throws IOException {
int timeout = FailoverController.getRpcTimeoutToNewActive(getConf());
ZKFCProtocol proxy = toNode.getZKFCProxy(getConf(), timeout);
try {
proxy.gracefulFailover();
out.println("Failover to " + toNode + " successful");
} catch (ServiceFailedException sfe) {
errOut.println("Failover failed: " + sfe.getLocalizedMessage());
return -1;
}
return 0;
}
private int checkHealth(final CommandLine cmd)
throws IOException, ServiceFailedException {

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB;
import org.apache.hadoop.net.NetUtils;
import com.google.common.collect.Maps;
@ -48,6 +49,11 @@ public abstract class HAServiceTarget {
*/
public abstract InetSocketAddress getAddress();
/**
* @return the IPC address of the ZKFC on the target node
*/
public abstract InetSocketAddress getZKFCAddress();
/**
* @return a Fencer implementation configured for this target node
*/
@ -76,6 +82,20 @@ public abstract class HAServiceTarget {
confCopy, factory, timeoutMs);
}
/**
* @return a proxy to the ZKFC which is associated with this HA service.
*/
public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs)
throws IOException {
Configuration confCopy = new Configuration(conf);
// Lower the timeout so we quickly fail to connect
confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
return new ZKFCProtocolClientSideTranslatorPB(
getZKFCAddress(),
confCopy, factory, timeoutMs);
}
public final Map<String, String> getFencingParameters() {
Map<String, String> ret = Maps.newHashMap();
addFencingParameters(ret);

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.KerberosInfo;
import java.io.IOException;
/**
* Protocol exposed by the ZKFailoverController, allowing for graceful
* failover.
*/
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ZKFCProtocol {
/**
* Initial version of the protocol
*/
public static final long versionID = 1L;
/**
* Request that this service yield from the active node election for the
* specified time period.
*
* If the node is not currently active, it simply prevents any attempts
* to become active for the specified time period. Otherwise, it first
* tries to transition the local service to standby state, and then quits
* the election.
*
* If the attempt to transition to standby succeeds, then the ZKFC receiving
* this RPC will delete its own breadcrumb node in ZooKeeper. Thus, the
* next node to become active will not run any fencing process. Otherwise,
* the breadcrumb will be left, such that the next active will fence this
* node.
*
* After the specified time period elapses, the node will attempt to re-join
* the election, provided that its service is healthy.
*
* If the node has previously been instructed to cede active, and is still
* within the specified time period, the later command's time period will
* take precedence, resetting the timer.
*
* A call to cedeActive which specifies a 0 or negative time period will
* allow the target node to immediately rejoin the election, so long as
* it is healthy.
*
* @param millisToCede period for which the node should not attempt to
* become active
* @throws IOException if the operation fails
* @throws AccessControlException if the operation is disallowed
*/
@Idempotent
public void cedeActive(int millisToCede)
throws IOException, AccessControlException;
/**
* Request that this node try to become active through a graceful failover.
*
* If the node is already active, this is a no-op and simply returns success
* without taking any further action.
*
* If the node is not healthy, it will throw an exception indicating that it
* is not able to become active.
*
* If the node is healthy and not active, it will try to initiate a graceful
* failover to become active, returning only when it has successfully become
* active. See {@link ZKFailoverController#gracefulFailoverToYou()} for the
* implementation details.
*
* If the node fails to successfully coordinate the failover, throws an
* exception indicating the reason for failure.
*
* @throws IOException if graceful failover fails
* @throws AccessControlException if the operation is disallowed
*/
@Idempotent
public void gracefulFailover()
throws IOException, AccessControlException;
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
import org.apache.hadoop.ha.protocolPB.ZKFCProtocolPB;
import org.apache.hadoop.ha.protocolPB.ZKFCProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import com.google.protobuf.BlockingService;
@InterfaceAudience.LimitedPrivate("HDFS")
@InterfaceStability.Evolving
public class ZKFCRpcServer implements ZKFCProtocol {
private static final int HANDLER_COUNT = 3;
private final ZKFailoverController zkfc;
private Server server;
ZKFCRpcServer(Configuration conf,
InetSocketAddress bindAddr,
ZKFailoverController zkfc,
PolicyProvider policy) throws IOException {
this.zkfc = zkfc;
RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
ProtobufRpcEngine.class);
ZKFCProtocolServerSideTranslatorPB translator =
new ZKFCProtocolServerSideTranslatorPB(this);
BlockingService service = ZKFCProtocolService
.newReflectiveBlockingService(translator);
this.server = RPC.getServer(
ZKFCProtocolPB.class,
service, bindAddr.getHostName(),
bindAddr.getPort(), HANDLER_COUNT, false, conf,
null /*secretManager*/);
// set service-level authorization security policy
if (conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
server.refreshServiceAcl(conf, policy);
}
}
void start() {
this.server.start();
}
public InetSocketAddress getAddress() {
return server.getListenerAddress();
}
void stopAndJoin() throws InterruptedException {
this.server.stop();
this.server.join();
}
@Override
public void cedeActive(int millisToCede) throws IOException,
AccessControlException {
zkfc.checkRpcAdminAccess();
zkfc.cedeActive(millisToCede);
}
@Override
public void gracefulFailover() throws IOException, AccessControlException {
zkfc.checkRpcAdminAccess();
zkfc.gracefulFailoverToYou();
}
}

View File

@ -18,21 +18,32 @@
package org.apache.hadoop.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
@ -41,6 +52,8 @@ import org.apache.zookeeper.data.ACL;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController implements Tool {
@ -85,6 +98,7 @@ public abstract class ZKFailoverController implements Tool {
private HealthMonitor healthMonitor;
private ActiveStandbyElector elector;
protected ZKFCRpcServer rpcServer;
private HAServiceTarget localTarget;
@ -93,6 +107,22 @@ public abstract class ZKFailoverController implements Tool {
/** Set if a fatal error occurs */
private String fatalError = null;
/**
* A future nanotime before which the ZKFC will not join the election.
* This is used during graceful failover.
*/
private long delayJoiningUntilNanotime = 0;
/** Executor on which {@link #scheduleRecheck(long)} schedules events */
private ScheduledExecutorService delayExecutor =
Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ZKFC Delay timer #%d")
.build());
private ActiveAttemptRecord lastActiveAttemptRecord;
private Object activeAttemptRecordLock = new Object();
@Override
public void setConf(Configuration conf) {
this.conf = conf;
@ -104,6 +134,10 @@ public abstract class ZKFailoverController implements Tool {
protected abstract HAServiceTarget getLocalTarget();
protected abstract HAServiceTarget dataToTarget(byte[] data);
protected abstract void loginAsFCUser() throws IOException;
protected abstract void checkRpcAdminAccess()
throws AccessControlException, IOException;
protected abstract InetSocketAddress getRpcAddressToBindTo();
protected abstract PolicyProvider getPolicyProvider();
/**
* Return the name of a znode inside the configured parent znode in which
@ -194,10 +228,14 @@ public abstract class ZKFailoverController implements Tool {
return ERR_CODE_NO_FENCER;
}
initRPC();
initHM();
startRPC();
try {
mainLoop();
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
@ -262,6 +300,16 @@ public abstract class ZKFailoverController implements Tool {
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.start();
}
protected void initRPC() throws IOException {
InetSocketAddress bindAddr = getRpcAddressToBindTo();
rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
}
protected void startRPC() throws IOException {
rpcServer.start();
}
private void initZK() throws HadoopIllegalArgumentException, IOException {
zkQuorum = conf.get(ZK_QUORUM_KEY);
@ -328,10 +376,18 @@ public abstract class ZKFailoverController implements Tool {
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
conf, FailoverController.getRpcTimeoutToNewActive(conf)),
createReqInfo());
LOG.info("Successfully transitioned " + localTarget +
" to active state");
String msg = "Successfully transitioned " + localTarget +
" to active state";
LOG.info(msg);
recordActiveAttempt(new ActiveAttemptRecord(true, msg));
} catch (Throwable t) {
LOG.fatal("Couldn't make " + localTarget + " active", t);
String msg = "Couldn't make " + localTarget + " active";
LOG.fatal(msg, t);
recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" +
StringUtils.stringifyException(t)));
if (t instanceof ServiceFailedException) {
throw (ServiceFailedException)t;
} else {
@ -350,6 +406,69 @@ public abstract class ZKFailoverController implements Tool {
}
}
/**
* Store the results of the last attempt to become active.
* This is used so that, during manually initiated failover,
* we can report back the results of the attempt to become active
* to the initiator of the failover.
*/
private void recordActiveAttempt(
ActiveAttemptRecord record) {
synchronized (activeAttemptRecordLock) {
lastActiveAttemptRecord = record;
activeAttemptRecordLock.notifyAll();
}
}
/**
* Wait until one of the following events:
* <ul>
* <li>Another thread publishes the results of an attempt to become active
* using {@link #recordActiveAttempt(ActiveAttemptRecord)}</li>
* <li>The node enters bad health status</li>
* <li>The specified timeout elapses</li>
* </ul>
*
* @param timeoutMillis number of millis to wait
* @return the published record, or null if the timeout elapses or the
* service becomes unhealthy
* @throws InterruptedException if the thread is interrupted.
*/
private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis)
throws InterruptedException {
long st = System.nanoTime();
long waitUntil = st + TimeUnit.NANOSECONDS.convert(
timeoutMillis, TimeUnit.MILLISECONDS);
do {
// periodically check health state, because entering an
// unhealthy state could prevent us from ever attempting to
// become active. We can detect this and respond to the user
// immediately.
synchronized (this) {
if (lastHealthState != State.SERVICE_HEALTHY) {
// early out if service became unhealthy
return null;
}
}
synchronized (activeAttemptRecordLock) {
if ((lastActiveAttemptRecord != null &&
lastActiveAttemptRecord.nanoTime >= st)) {
return lastActiveAttemptRecord;
}
// Only wait 1sec so that we periodically recheck the health state
// above.
activeAttemptRecordLock.wait(1000);
}
} while (System.nanoTime() < waitUntil);
// Timeout elapsed.
LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " +
"to become active");
return null;
}
private StateChangeRequestInfo createReqInfo() {
return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC);
}
@ -369,6 +488,304 @@ public abstract class ZKFailoverController implements Tool {
// at the same time.
}
}
private synchronized void fenceOldActive(byte[] data) {
HAServiceTarget target = dataToTarget(data);
try {
doFence(target);
} catch (Throwable t) {
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
Throwables.propagate(t);
}
}
private void doFence(HAServiceTarget target) {
LOG.info("Should fence: " + target);
boolean gracefulWorked = new FailoverController(conf,
RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
if (gracefulWorked) {
// It's possible that it's in standby but just about to go into active,
// no? Is there some race here?
LOG.info("Successfully transitioned " + target + " to standby " +
"state without fencing");
return;
}
try {
target.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.error("Couldn't fence old active " + target, e);
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
throw new RuntimeException(e);
}
if (!target.getFencer().fence(target)) {
throw new RuntimeException("Unable to fence " + target);
}
}
/**
* Request from graceful failover to cede active role. Causes
* this ZKFC to transition its local node to standby, then quit
* the election for the specified period of time, after which it
* will rejoin iff it is healthy.
*/
void cedeActive(final int millisToCede)
throws AccessControlException, ServiceFailedException, IOException {
try {
UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
doCedeActive(millisToCede);
return null;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private void doCedeActive(int millisToCede)
throws AccessControlException, ServiceFailedException, IOException {
int timeout = FailoverController.getGracefulFenceTimeout(conf);
// Lock elector to maintain lock ordering of elector -> ZKFC
synchronized (elector) {
synchronized (this) {
if (millisToCede <= 0) {
delayJoiningUntilNanotime = 0;
recheckElectability();
return;
}
LOG.info("Requested by " + UserGroupInformation.getCurrentUser() +
" at " + Server.getRemoteAddress() + " to cede active role.");
boolean needFence = false;
try {
localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
LOG.info("Successfully ensured local node is in standby mode");
} catch (IOException ioe) {
LOG.warn("Unable to transition local node to standby: " +
ioe.getLocalizedMessage());
LOG.warn("Quitting election but indicating that fencing is " +
"necessary");
needFence = true;
}
delayJoiningUntilNanotime = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(millisToCede);
elector.quitElection(needFence);
}
}
recheckElectability();
}
/**
* Coordinate a graceful failover to this node.
* @throws ServiceFailedException if the node fails to become active
* @throws IOException some other error occurs
*/
void gracefulFailoverToYou() throws ServiceFailedException, IOException {
try {
UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
doGracefulFailover();
return null;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
* Coordinate a graceful failover. This proceeds in several phases:
* 1) Pre-flight checks: ensure that the local node is healthy, and
* thus a candidate for failover.
* 2) Determine the current active node. If it is the local node, no
* need to failover - return success.
* 3) Ask that node to yield from the election for a number of seconds.
* 4) Allow the normal election path to run in other threads. Wait until
* we either become unhealthy or we see an election attempt recorded by
* the normal code path.
* 5) Allow the old active to rejoin the election, so a future
* failback is possible.
*/
private void doGracefulFailover()
throws ServiceFailedException, IOException, InterruptedException {
int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
// Phase 1: pre-flight checks
checkEligibleForFailover();
// Phase 2: determine old/current active node. Check that we're not
// ourselves active, etc.
HAServiceTarget oldActive = getCurrentActive();
if (oldActive == null) {
// No node is currently active. So, if we aren't already
// active ourselves by means of a normal election, then there's
// probably something preventing us from becoming active.
throw new ServiceFailedException(
"No other node is currently active.");
}
if (oldActive.getAddress().equals(localTarget.getAddress())) {
LOG.info("Local node " + localTarget + " is already active. " +
"No need to failover. Returning success.");
return;
}
// Phase 3: ask the old active to yield from the election.
LOG.info("Asking " + oldActive + " to cede its active state for " +
timeout + "ms");
ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
oldZkfc.cedeActive(timeout);
// Phase 4: wait for the normal election to make the local node
// active.
ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
if (attempt == null) {
// We didn't even make an attempt to become active.
synchronized(this) {
if (lastHealthState != State.SERVICE_HEALTHY) {
throw new ServiceFailedException("Unable to become active. " +
"Service became unhealthy while trying to failover.");
}
}
throw new ServiceFailedException("Unable to become active. " +
"Local node did not get an opportunity to do so from ZooKeeper, " +
"or the local node took too long to transition to active.");
}
// Phase 5. At this point, we made some attempt to become active. So we
// can tell the old active to rejoin if it wants. This allows a quick
// fail-back if we immediately crash.
oldZkfc.cedeActive(-1);
if (attempt.succeeded) {
LOG.info("Successfully became active. " + attempt.status);
} else {
// Propagate failure
String msg = "Failed to become active. " + attempt.status;
throw new ServiceFailedException(msg);
}
}
/**
* Ensure that the local node is in a healthy state, and thus
* eligible for graceful failover.
* @throws ServiceFailedException if the node is unhealthy
*/
private synchronized void checkEligibleForFailover()
throws ServiceFailedException {
// Check health
if (this.getLastHealthState() != State.SERVICE_HEALTHY) {
throw new ServiceFailedException(
localTarget + " is not currently healthy. " +
"Cannot be failover target");
}
}
/**
* @return an {@link HAServiceTarget} for the current active node
* in the cluster, or null if no node is active.
* @throws IOException if a ZK-related issue occurs
* @throws InterruptedException if thread is interrupted
*/
private HAServiceTarget getCurrentActive()
throws IOException, InterruptedException {
synchronized (elector) {
synchronized (this) {
byte[] activeData;
try {
activeData = elector.getActiveData();
} catch (ActiveNotFoundException e) {
return null;
} catch (KeeperException ke) {
throw new IOException(
"Unexpected ZooKeeper issue fetching active node info", ke);
}
HAServiceTarget oldActive = dataToTarget(activeData);
return oldActive;
}
}
}
/**
* Check the current state of the service, and join the election
* if it should be in the election.
*/
private void recheckElectability() {
// Maintain lock ordering of elector -> ZKFC
synchronized (elector) {
synchronized (this) {
boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
long remainingDelay = delayJoiningUntilNanotime - System.nanoTime();
if (remainingDelay > 0) {
if (healthy) {
LOG.info("Would have joined master election, but this node is " +
"prohibited from doing so for " +
TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
}
scheduleRecheck(remainingDelay);
return;
}
switch (lastHealthState) {
case SERVICE_HEALTHY:
elector.joinElection(targetToData(localTarget));
break;
case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election");
elector.quitElection(false);
break;
case SERVICE_UNHEALTHY:
case SERVICE_NOT_RESPONDING:
LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary");
elector.quitElection(true);
break;
case HEALTH_MONITOR_FAILED:
fatalError("Health monitor failed!");
break;
default:
throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
}
}
}
}
/**
* Schedule a call to {@link #recheckElectability()} in the future.
*/
private void scheduleRecheck(long whenNanos) {
delayExecutor.schedule(
new Runnable() {
@Override
public void run() {
try {
recheckElectability();
} catch (Throwable t) {
fatalError("Failed to recheck electability: " +
StringUtils.stringifyException(t));
}
}
},
whenNanos, TimeUnit.NANOSECONDS);
}
/**
* @return the last health state passed to the FC
@ -383,6 +800,11 @@ public abstract class ZKFailoverController implements Tool {
ActiveStandbyElector getElectorForTests() {
return elector;
}
@VisibleForTesting
ZKFCRpcServer getRpcServerForTests() {
return rpcServer;
}
/**
* Callbacks from elector
@ -409,32 +831,7 @@ public abstract class ZKFailoverController implements Tool {
@Override
public void fenceOldActive(byte[] data) {
HAServiceTarget target = dataToTarget(data);
LOG.info("Should fence: " + target);
boolean gracefulWorked = new FailoverController(conf,
RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
if (gracefulWorked) {
// It's possible that it's in standby but just about to go into active,
// no? Is there some race here?
LOG.info("Successfully transitioned " + target + " to standby " +
"state without fencing");
return;
}
try {
target.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.error("Couldn't fence old active " + target, e);
// TODO: see below todo
throw new RuntimeException(e);
}
if (!target.getFencer().fence(target)) {
// TODO: this will end up in some kind of tight loop,
// won't it? We need some kind of backoff
throw new RuntimeException("Unable to fence " + target);
}
ZKFailoverController.this.fenceOldActive(data);
}
@Override
@ -451,34 +848,21 @@ public abstract class ZKFailoverController implements Tool {
public void enteredState(HealthMonitor.State newState) {
LOG.info("Local service " + localTarget +
" entered state: " + newState);
switch (newState) {
case SERVICE_HEALTHY:
LOG.info("Joining master election for " + localTarget);
elector.joinElection(targetToData(localTarget));
break;
case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election");
elector.quitElection(false);
break;
case SERVICE_UNHEALTHY:
case SERVICE_NOT_RESPONDING:
LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary");
elector.quitElection(true);
break;
case HEALTH_MONITOR_FAILED:
fatalError("Health monitor failed!");
break;
default:
throw new IllegalArgumentException("Unhandled state:" + newState);
}
lastHealthState = newState;
recheckElectability();
}
}
private static class ActiveAttemptRecord {
private final boolean succeeded;
private final String status;
private final long nanoTime;
public ActiveAttemptRecord(boolean succeeded, String status) {
this.succeeded = succeeded;
this.status = status;
this.nanoTime = System.nanoTime();
}
}
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class ZKFCProtocolClientSideTranslatorPB implements
ZKFCProtocol, Closeable, ProtocolTranslator {
private final static RpcController NULL_CONTROLLER = null;
private final ZKFCProtocolPB rpcProxy;
public ZKFCProtocolClientSideTranslatorPB(
InetSocketAddress addr, Configuration conf,
SocketFactory socketFactory, int timeout) throws IOException {
RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(ZKFCProtocolPB.class,
RPC.getProtocolVersion(ZKFCProtocolPB.class), addr,
UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
}
@Override
public void cedeActive(int millisToCede) throws IOException,
AccessControlException {
try {
CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder()
.setMillisToCede(millisToCede)
.build();
rpcProxy.cedeActive(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void gracefulFailover() throws IOException, AccessControlException {
try {
rpcProxy.gracefulFailover(NULL_CONTROLLER,
GracefulFailoverRequestProto.getDefaultInstance());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(protocolName = "org.apache.hadoop.ha.ZKFCProtocol",
protocolVersion = 1)
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ZKFCProtocolPB extends
ZKFCProtocolService.BlockingInterface, VersionedProtocol {
/**
* If any methods need annotation, it can be added here
*/
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha.protocolPB;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveResponseProto;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverResponseProto;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
@InterfaceStability.Stable
public class ZKFCProtocolServerSideTranslatorPB implements
ZKFCProtocolPB {
private final ZKFCProtocol server;
public ZKFCProtocolServerSideTranslatorPB(ZKFCProtocol server) {
this.server = server;
}
@Override
public CedeActiveResponseProto cedeActive(RpcController controller,
CedeActiveRequestProto request) throws ServiceException {
try {
server.cedeActive(request.getMillisToCede());
return CedeActiveResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GracefulFailoverResponseProto gracefulFailover(
RpcController controller, GracefulFailoverRequestProto request)
throws ServiceException {
try {
server.gracefulFailover();
return GracefulFailoverResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return RPC.getProtocolVersion(ZKFCProtocolPB.class);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
if (!protocol.equals(RPC.getProtocolName(ZKFCProtocolPB.class))) {
throw new IOException("Serverside implements " +
RPC.getProtocolName(ZKFCProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(ZKFCProtocolPB.class),
HAServiceProtocolPB.class);
}
}

View File

@ -223,6 +223,12 @@
<description>ACL for HAService protocol used by HAAdmin to manage the
active and stand-by states of namenode.</description>
</property>
<property>
<name>security.zkfc.protocol.acl</name>
<value>*</value>
<description>ACL for access to the ZK Failover Controller
</description>
</property>
<property>
<name>security.mrhs.client.protocol.acl</name>

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ha.proto";
option java_outer_classname = "ZKFCProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
message CedeActiveRequestProto {
required uint32 millisToCede = 1;
}
message CedeActiveResponseProto {
}
message GracefulFailoverRequestProto {
}
message GracefulFailoverResponseProto {
}
/**
* Protocol provides manual control of the ZK Failover Controllers
*/
service ZKFCProtocolService {
/**
* Request that the service cede its active state, and quit the election
* for some amount of time
*/
rpc cedeActive(CedeActiveRequestProto)
returns(CedeActiveResponseProto);
rpc gracefulFailover(GracefulFailoverRequestProto)
returns(GracefulFailoverResponseProto);
}

View File

@ -40,13 +40,15 @@ class DummyHAService extends HAServiceTarget {
private static final String DUMMY_FENCE_KEY = "dummy.fence.key";
volatile HAServiceState state;
HAServiceProtocol proxy;
ZKFCProtocol zkfcProxy = null;
NodeFencer fencer;
InetSocketAddress address;
boolean isHealthy = true;
boolean actUnreachable = false;
boolean failToBecomeActive;
boolean failToBecomeActive, failToBecomeStandby, failToFence;
DummySharedResource sharedResource;
public int fenceCount = 0;
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
@ -82,12 +84,24 @@ class DummyHAService extends HAServiceTarget {
return address;
}
@Override
public InetSocketAddress getZKFCAddress() {
return null;
}
@Override
public HAServiceProtocol getProxy(Configuration conf, int timeout)
throws IOException {
return proxy;
}
@Override
public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout)
throws IOException {
assert zkfcProxy != null;
return zkfcProxy;
}
@Override
public NodeFencer getFencer() {
return fencer;
@ -139,6 +153,9 @@ class DummyHAService extends HAServiceTarget {
public void transitionToStandby(StateChangeRequestInfo req) throws ServiceFailedException,
AccessControlException, IOException {
checkUnreachable();
if (failToBecomeStandby) {
throw new ServiceFailedException("injected failure");
}
if (sharedResource != null) {
sharedResource.release(DummyHAService.this);
}
@ -167,7 +184,6 @@ class DummyHAService extends HAServiceTarget {
}
public static class DummyFencer implements FenceMethod {
public void checkArgs(String args) throws BadFencingConfigurationException {
}
@ -176,6 +192,13 @@ class DummyHAService extends HAServiceTarget {
throws BadFencingConfigurationException {
LOG.info("tryFence(" + target + ")");
DummyHAService svc = (DummyHAService)target;
synchronized (svc) {
svc.fenceCount++;
}
if (svc.failToFence) {
LOG.info("Injected failure to fence");
return false;
}
svc.sharedResource.release(svc);
return true;
}

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.zookeeper.KeeperException.NoNodeException;
@ -126,6 +128,10 @@ public class MiniZKFCCluster {
public ActiveStandbyElector getElector(int i) {
return thrs[i].zkfc.getElectorForTests();
}
public DummyZKFC getZkfc(int i) {
return thrs[i].zkfc;
}
public void setHealthy(int idx, boolean healthy) {
svcs[idx].isHealthy = healthy;
@ -134,6 +140,14 @@ public class MiniZKFCCluster {
public void setFailToBecomeActive(int idx, boolean doFail) {
svcs[idx].failToBecomeActive = doFail;
}
public void setFailToBecomeStandby(int idx, boolean doFail) {
svcs[idx].failToBecomeStandby = doFail;
}
public void setFailToFence(int idx, boolean doFail) {
svcs[idx].failToFence = doFail;
}
public void setUnreachable(int idx, boolean unreachable) {
svcs[idx].actUnreachable = unreachable;
@ -290,5 +304,25 @@ public class MiniZKFCCluster {
protected String getScopeInsideParentNode() {
return DUMMY_CLUSTER;
}
@Override
protected void checkRpcAdminAccess() throws AccessControlException {
}
@Override
protected InetSocketAddress getRpcAddressToBindTo() {
return new InetSocketAddress(0);
}
@Override
protected void initRPC() throws IOException {
super.initRPC();
localTarget.zkfcProxy = this.getRpcServerForTests();
}
@Override
protected PolicyProvider getPolicyProvider() {
return null;
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@ -378,6 +379,205 @@ public class TestZKFailoverController extends ClientBaseWithFixes {
cluster.stop();
}
}
/**
* Test that the ZKFC can gracefully cede its active status.
*/
@Test(timeout=15000)
public void testCedeActive() throws Exception {
try {
cluster.start();
DummyZKFC zkfc = cluster.getZkfc(0);
// It should be in active to start.
assertEquals(ActiveStandbyElector.State.ACTIVE,
zkfc.getElectorForTests().getStateForTests());
// Ask it to cede active for 3 seconds. It should respond promptly
// (i.e. the RPC itself should not take 3 seconds!)
ZKFCProtocol proxy = zkfc.getLocalTarget().getZKFCProxy(conf, 5000);
long st = System.currentTimeMillis();
proxy.cedeActive(3000);
long et = System.currentTimeMillis();
assertTrue("RPC to cedeActive took " + (et - st) + " ms",
et - st < 1000);
// Should be in "INIT" state since it's not in the election
// at this point.
assertEquals(ActiveStandbyElector.State.INIT,
zkfc.getElectorForTests().getStateForTests());
// After the prescribed 3 seconds, should go into STANDBY state,
// since the other node in the cluster would have taken ACTIVE.
cluster.waitForElectorState(0, ActiveStandbyElector.State.STANDBY);
long et2 = System.currentTimeMillis();
assertTrue("Should take ~3 seconds to rejoin. Only took " + (et2 - et) +
"ms before rejoining.",
et2 - et > 2800);
} finally {
cluster.stop();
}
}
@Test(timeout=15000)
public void testGracefulFailover() throws Exception {
try {
cluster.start();
cluster.waitForActiveLockHolder(0);
cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
cluster.waitForActiveLockHolder(1);
cluster.getService(0).getZKFCProxy(conf, 5000).gracefulFailover();
cluster.waitForActiveLockHolder(0);
assertEquals(0, cluster.getService(0).fenceCount);
assertEquals(0, cluster.getService(1).fenceCount);
} finally {
cluster.stop();
}
}
@Test(timeout=15000)
public void testGracefulFailoverToUnhealthy() throws Exception {
try {
cluster.start();
cluster.waitForActiveLockHolder(0);
// Mark it unhealthy, wait for it to exit election
cluster.setHealthy(1, false);
cluster.waitForElectorState(1, ActiveStandbyElector.State.INIT);
// Ask for failover, it should fail, because it's unhealthy
try {
cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
fail("Did not fail to graceful failover to unhealthy service!");
} catch (ServiceFailedException sfe) {
GenericTestUtils.assertExceptionContains(
cluster.getService(1).toString() +
" is not currently healthy.", sfe);
}
} finally {
cluster.stop();
}
}
@Test(timeout=15000)
public void testGracefulFailoverFailBecomingActive() throws Exception {
try {
cluster.start();
cluster.waitForActiveLockHolder(0);
cluster.setFailToBecomeActive(1, true);
// Ask for failover, it should fail and report back to user.
try {
cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
fail("Did not fail to graceful failover when target failed " +
"to become active!");
} catch (ServiceFailedException sfe) {
GenericTestUtils.assertExceptionContains(
"Couldn't make " + cluster.getService(1) + " active", sfe);
GenericTestUtils.assertExceptionContains(
"injected failure", sfe);
}
// No fencing
assertEquals(0, cluster.getService(0).fenceCount);
assertEquals(0, cluster.getService(1).fenceCount);
// Service 0 should go back to being active after the failed failover
cluster.waitForActiveLockHolder(0);
} finally {
cluster.stop();
}
}
@Test(timeout=15000)
public void testGracefulFailoverFailBecomingStandby() throws Exception {
try {
cluster.start();
cluster.waitForActiveLockHolder(0);
// Ask for failover when old node fails to transition to standby.
// This should trigger fencing, since the cedeActive() command
// still works, but leaves the breadcrumb in place.
cluster.setFailToBecomeStandby(0, true);
cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
// Check that the old node was fenced
assertEquals(1, cluster.getService(0).fenceCount);
} finally {
cluster.stop();
}
}
@Test(timeout=15000)
public void testGracefulFailoverFailBecomingStandbyAndFailFence()
throws Exception {
try {
cluster.start();
cluster.waitForActiveLockHolder(0);
// Ask for failover when old node fails to transition to standby.
// This should trigger fencing, since the cedeActive() command
// still works, but leaves the breadcrumb in place.
cluster.setFailToBecomeStandby(0, true);
cluster.setFailToFence(0, true);
try {
cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
fail("Failover should have failed when old node wont fence");
} catch (ServiceFailedException sfe) {
GenericTestUtils.assertExceptionContains(
"Unable to fence " + cluster.getService(0), sfe);
}
} finally {
cluster.stop();
}
}
/**
* Test which exercises all of the inputs into ZKFC. This is particularly
* useful for running under jcarder to check for lock order violations.
*/
@Test(timeout=30000)
public void testOneOfEverything() throws Exception {
try {
cluster.start();
// Failover by session expiration
LOG.info("====== Failing over by session expiration");
cluster.expireAndVerifyFailover(0, 1);
cluster.expireAndVerifyFailover(1, 0);
// Restart ZK
LOG.info("====== Restarting server");
stopServer();
waitForServerDown(hostPort, CONNECTION_TIMEOUT);
startServer();
waitForServerUp(hostPort, CONNECTION_TIMEOUT);
// Failover by bad health
cluster.setHealthy(0, false);
cluster.waitForHAState(0, HAServiceState.STANDBY);
cluster.waitForHAState(1, HAServiceState.ACTIVE);
cluster.setHealthy(1, true);
cluster.setHealthy(0, false);
cluster.waitForHAState(1, HAServiceState.ACTIVE);
cluster.waitForHAState(0, HAServiceState.STANDBY);
cluster.setHealthy(0, true);
cluster.waitForHealthState(0, State.SERVICE_HEALTHY);
// Graceful failovers
cluster.getZkfc(1).gracefulFailoverToYou();
cluster.getZkfc(0).gracefulFailoverToYou();
} finally {
cluster.stop();
}
}
private int runFC(DummyHAService target, String ... args) throws Exception {
DummyZKFC zkfc = new DummyZKFC(target);

View File

@ -352,4 +352,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";
public static final String DFS_HA_AUTO_FAILOVER_ENABLED_KEY = "dfs.ha.automatic-failover.enabled";
public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -47,6 +48,8 @@ public class HDFSPolicyProvider extends PolicyProvider {
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
HAServiceProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,
ZKFCProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY,
RefreshAuthorizationPolicyProtocol.class),

View File

@ -181,7 +181,8 @@ public class NameNode {
DFS_NAMENODE_BACKUP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY,
DFS_HA_FENCE_METHODS_KEY
DFS_HA_FENCE_METHODS_KEY,
DFS_HA_ZKFC_PORT_KEY
};
/**

View File

@ -30,11 +30,18 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.ZKFailoverController;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@ -48,6 +55,7 @@ public class DFSZKFailoverController extends ZKFailoverController {
LogFactory.getLog(DFSZKFailoverController.class);
private NNHAServiceTarget localTarget;
private Configuration localNNConf;
private AccessControlList adminAcl;
@Override
protected HAServiceTarget dataToTarget(byte[] data) {
@ -68,21 +76,43 @@ public class DFSZKFailoverController extends ZKFailoverController {
ret + ": Stored protobuf was " + proto + ", address from our own " +
"configuration for this NameNode was " + ret.getAddress());
}
ret.setZkfcPort(proto.getZkfcPort());
return ret;
}
@Override
protected byte[] targetToData(HAServiceTarget target) {
InetSocketAddress addr = target.getAddress();
return ActiveNodeInfo.newBuilder()
.setHostname(addr.getHostName())
.setPort(addr.getPort())
.setZkfcPort(target.getZKFCAddress().getPort())
.setNameserviceId(localTarget.getNameServiceId())
.setNamenodeId(localTarget.getNameNodeId())
.build()
.toByteArray();
}
@Override
protected InetSocketAddress getRpcAddressToBindTo() {
int zkfcPort = getZkfcPort(localNNConf);
return new InetSocketAddress(localTarget.getAddress().getAddress(),
zkfcPort);
}
@Override
protected PolicyProvider getPolicyProvider() {
return new HDFSPolicyProvider();
}
static int getZkfcPort(Configuration conf) {
return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY,
DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT);
}
@Override
public void setConf(Configuration conf) {
localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
@ -98,10 +128,21 @@ public class DFSZKFailoverController extends ZKFailoverController {
localTarget = new NNHAServiceTarget(localNNConf, nsId, nnId);
// Setup ACLs
adminAcl = new AccessControlList(
conf.get(DFSConfigKeys.DFS_ADMIN, " "));
super.setConf(localNNConf);
LOG.info("Failover controller configured for NameNode " +
nsId + "." + nnId);
}
@Override
protected void initRPC() throws IOException {
super.initRPC();
localTarget.setZkfcPort(rpcServer.getAddress().getPort());
}
@Override
public HAServiceTarget getLocalTarget() {
@ -127,4 +168,19 @@ public class DFSZKFailoverController extends ZKFailoverController {
System.exit(ToolRunner.run(
new DFSZKFailoverController(), args));
}
@Override
protected void checkRpcAdminAccess() throws IOException, AccessControlException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation zkfcUgi = UserGroupInformation.getLoginUser();
if (adminAcl.isUserAllowed(ugi) ||
ugi.getShortUserName().equals(zkfcUgi.getShortUserName())) {
LOG.info("Allowed RPC access from " + ugi + " at " + Server.getRemoteAddress());
return;
}
String msg = "Disallowed RPC access from " + ugi + " at " +
Server.getRemoteAddress() + ". Not listed in " + DFSConfigKeys.DFS_ADMIN;
LOG.warn(msg);
throw new AccessControlException(msg);
}
}

View File

@ -45,12 +45,13 @@ public class NNHAServiceTarget extends HAServiceTarget {
private static final String NAMENODE_ID_KEY = "namenodeid";
private final InetSocketAddress addr;
private InetSocketAddress zkfcAddr;
private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError;
private final String nnId;
private final String nsId;
private final boolean autoFailoverEnabled;
public NNHAServiceTarget(Configuration conf,
String nsId, String nnId) {
Preconditions.checkNotNull(nnId);
@ -77,17 +78,26 @@ public class NNHAServiceTarget extends HAServiceTarget {
}
this.addr = NetUtils.createSocketAddr(serviceAddr,
NameNode.DEFAULT_PORT);
this.autoFailoverEnabled = targetConf.getBoolean(
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
if (autoFailoverEnabled) {
int port = DFSZKFailoverController.getZkfcPort(targetConf);
if (port != 0) {
setZkfcPort(port);
}
}
try {
this.fencer = NodeFencer.create(targetConf,
DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY);
} catch (BadFencingConfigurationException e) {
this.fenceConfigError = e;
}
this.nnId = nnId;
this.nsId = nsId;
this.autoFailoverEnabled = targetConf.getBoolean(
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
}
/**
@ -98,6 +108,21 @@ public class NNHAServiceTarget extends HAServiceTarget {
return addr;
}
@Override
public InetSocketAddress getZKFCAddress() {
Preconditions.checkState(autoFailoverEnabled,
"ZKFC address not relevant when auto failover is off");
assert zkfcAddr != null;
return zkfcAddr;
}
void setZkfcPort(int port) {
assert autoFailoverEnabled;
this.zkfcAddr = new InetSocketAddress(addr.getAddress(), port);
}
@Override
public void checkFencingConfigured() throws BadFencingConfigurationException {
if (fenceConfigError != null) {

View File

@ -24,4 +24,5 @@ message ActiveNodeInfo {
required string hostname = 3;
required int32 port = 4;
required int32 zkfcPort = 5;
}

View File

@ -22,10 +22,10 @@ import static org.junit.Assert.*;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.ZKFailoverController;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
@ -62,6 +62,15 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes {
AlwaysSucceedFencer.class.getName());
conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
// Turn off IPC client caching, so that the suite can handle
// the restart of the daemons between test cases.
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn1", 10003);
conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2", 10004);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001))
@ -100,18 +109,6 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes {
}
}
/**
* Test that, when automatic failover is enabled, the manual
* failover script refuses to run.
*/
@Test(timeout=10000)
public void testManualFailoverIsDisabled() throws Exception {
DFSHAAdmin admin = new DFSHAAdmin();
admin.setConf(conf);
int rc = admin.run(new String[]{"-failover", "nn1", "nn2"});
assertEquals(-1, rc);
}
/**
* Test that automatic failover is triggered by shutting the
* active NN down.
@ -148,6 +145,29 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes {
thr2.zkfc.getLocalTarget().getAddress());
}
@Test(timeout=30000)
public void testManualFailover() throws Exception {
thr2.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
waitForHAState(0, HAServiceState.STANDBY);
waitForHAState(1, HAServiceState.ACTIVE);
thr1.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
waitForHAState(0, HAServiceState.ACTIVE);
waitForHAState(1, HAServiceState.STANDBY);
}
@Test(timeout=30000)
public void testManualFailoverWithDFSHAAdmin() throws Exception {
DFSHAAdmin tool = new DFSHAAdmin();
tool.setConf(conf);
tool.run(new String[]{"-failover", "nn1", "nn2"});
waitForHAState(0, HAServiceState.STANDBY);
waitForHAState(1, HAServiceState.ACTIVE);
tool.run(new String[]{"-failover", "nn2", "nn1"});
waitForHAState(0, HAServiceState.ACTIVE);
waitForHAState(1, HAServiceState.STANDBY);
}
private void waitForHAState(int nnidx, final HAServiceState state)
throws TimeoutException, InterruptedException {
final NameNode nn = cluster.getNameNode(nnidx);

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Before;
@ -56,6 +56,7 @@ public class TestDFSHAAdmin {
private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream();
private String errOutput;
private HAServiceProtocol mockProtocol;
private ZKFCProtocol mockZkfcProtocol;
private static final String NSID = "ns1";
@ -88,6 +89,7 @@ public class TestDFSHAAdmin {
@Before
public void setup() throws IOException {
mockProtocol = MockitoUtil.mockProtocol(HAServiceProtocol.class);
mockZkfcProtocol = MockitoUtil.mockProtocol(ZKFCProtocol.class);
tool = new DFSHAAdmin() {
@Override
@ -97,7 +99,9 @@ public class TestDFSHAAdmin {
// OVerride the target to return our mock protocol
try {
Mockito.doReturn(mockProtocol).when(spy).getProxy(
Mockito.<Configuration>any(), Mockito.anyInt());
Mockito.<Configuration>any(), Mockito.anyInt());
Mockito.doReturn(mockZkfcProtocol).when(spy).getZKFCProxy(
Mockito.<Configuration>any(), Mockito.anyInt());
} catch (IOException e) {
throw new AssertionError(e); // mock setup doesn't really throw
}
@ -172,8 +176,6 @@ public class TestDFSHAAdmin {
assertTrue(errOutput.contains("Refusing to manually manage"));
assertEquals(-1, runTool("-transitionToStandby", "nn1"));
assertTrue(errOutput.contains("Refusing to manually manage"));
assertEquals(-1, runTool("-failover", "nn1", "nn2"));
assertTrue(errOutput.contains("Refusing to manually manage"));
Mockito.verify(mockProtocol, Mockito.never())
.transitionToActive(anyReqInfo());
@ -186,12 +188,10 @@ public class TestDFSHAAdmin {
assertEquals(0, runTool("-transitionToActive", "-forcemanual", "nn1"));
setupConfirmationOnSystemIn();
assertEquals(0, runTool("-transitionToStandby", "-forcemanual", "nn1"));
setupConfirmationOnSystemIn();
assertEquals(0, runTool("-failover", "-forcemanual", "nn1", "nn2"));
Mockito.verify(mockProtocol, Mockito.times(2)).transitionToActive(
Mockito.verify(mockProtocol, Mockito.times(1)).transitionToActive(
reqInfoCaptor.capture());
Mockito.verify(mockProtocol, Mockito.times(2)).transitionToStandby(
Mockito.verify(mockProtocol, Mockito.times(1)).transitionToStandby(
reqInfoCaptor.capture());
// All of the RPCs should have had the "force" source
@ -300,6 +300,19 @@ public class TestDFSHAAdmin {
tool.setConf(conf);
assertEquals(-1, runTool("-failover", "nn1", "nn2", "--forcefence"));
}
@Test
public void testFailoverWithAutoHa() throws Exception {
Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();
// Turn on auto-HA in the config
HdfsConfiguration conf = getHAConf();
conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY, "shell(true)");
tool.setConf(conf);
assertEquals(0, runTool("-failover", "nn1", "nn2"));
Mockito.verify(mockZkfcProtocol).gracefulFailover();
}
@Test
public void testForceFenceOptionListedBeforeArgs() throws Exception {

View File

@ -116,5 +116,11 @@
<description>ACL for HAService protocol used by HAAdmin to manage the
active and stand-by states of namenode.</description>
</property>
<property>
<name>security.zkfc.protocol.acl</name>
<value>*</value>
<description>ACL for access to the ZK Failover Controller
</description>
</property>
</configuration>

View File

@ -710,6 +710,6 @@ digest:hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=:rwcda
* <<How can I initiate a manual failover when automatic failover is
configured?>>
Currently, this facility is not yet implemented. Instead, you may simply stop
the active NameNode daemon. This will trigger an automatic failover. This
process will be improved in future versions.
Even if automatic failover is configured, you may initiate a manual failover
using the same <<<hdfs haadmin>>> command. It will perform a coordinated
failover.