From 9d5799553fea81920edfab611e5d485a97841848 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 3 May 2012 01:56:33 +0000 Subject: [PATCH] HADOOP-8279. Allow manual failover to be invoked when auto-failover is enabled. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1333288 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.HDFS-3042.txt | 2 + .../hadoop/fs/CommonConfigurationKeys.java | 2 + .../hadoop/ha/ActiveStandbyElector.java | 11 +- .../java/org/apache/hadoop/ha/HAAdmin.java | 49 +- .../org/apache/hadoop/ha/HAServiceTarget.java | 20 + .../org/apache/hadoop/ha/ZKFCProtocol.java | 101 ++++ .../org/apache/hadoop/ha/ZKFCRpcServer.java | 98 ++++ .../hadoop/ha/ZKFailoverController.java | 496 ++++++++++++++++-- .../ZKFCProtocolClientSideTranslatorPB.java | 90 ++++ .../hadoop/ha/protocolPB/ZKFCProtocolPB.java | 39 ++ .../ZKFCProtocolServerSideTranslatorPB.java | 88 ++++ .../packages/templates/conf/hadoop-policy.xml | 6 + .../src/main/proto/ZKFCProtocol.proto | 52 ++ .../org/apache/hadoop/ha/DummyHAService.java | 27 +- .../org/apache/hadoop/ha/MiniZKFCCluster.java | 34 ++ .../hadoop/ha/TestZKFailoverController.java | 200 +++++++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hadoop/hdfs/HDFSPolicyProvider.java | 3 + .../hadoop/hdfs/server/namenode/NameNode.java | 3 +- .../hdfs/tools/DFSZKFailoverController.java | 56 ++ .../hadoop/hdfs/tools/NNHAServiceTarget.java | 33 +- .../hadoop-hdfs/src/main/proto/HAZKInfo.proto | 1 + .../ha/TestDFSZKFailoverController.java | 46 +- .../hadoop/hdfs/tools/TestDFSHAAdmin.java | 29 +- .../src/test/resources/hadoop-policy.xml | 6 + .../src/site/apt/HDFSHighAvailability.apt.vm | 6 +- 26 files changed, 1409 insertions(+), 91 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java create mode 100644 hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto diff --git a/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt index 91979bbe686..18d948a8d3f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt @@ -21,3 +21,5 @@ HADOOP-8246. Auto-HA: automatically scope znode by nameservice ID (todd) HADOOP-8247. Add a config to enable auto-HA, which disables manual FailoverController (todd) HADOOP-8306. ZKFC: improve error message when ZK is not running. (todd) + +HADOOP-8279. Allow manual failover to be invoked when auto-failover is enabled. (todd) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 52cb1f3c9a2..e2955ab9e71 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -116,6 +116,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { "security.refresh.user.mappings.protocol.acl"; public static final String SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl"; + public static final String + SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl"; public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP = "hadoop.security.token.service.use_ip"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index 50b85204be7..2081755fbf6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -378,7 +378,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { createConnection(); } Stat stat = new Stat(); - return zkClient.getData(zkLockFilePath, false, stat); + return getDataWithRetries(zkLockFilePath, false, stat); } catch(KeeperException e) { Code code = e.code(); if (isNodeDoesNotExist(code)) { @@ -889,6 +889,15 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { }); } + private byte[] getDataWithRetries(final String path, final boolean watch, + final Stat stat) throws InterruptedException, KeeperException { + return zkDoWithRetries(new ZKAction() { + public byte[] run() throws KeeperException, InterruptedException { + return zkClient.getData(path, watch, stat); + } + }); + } + private Stat setDataWithRetries(final String path, final byte[] data, final int version) throws InterruptedException, KeeperException { return zkDoWithRetries(new ZKAction() { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java index 95b46f9c42a..7d85c016deb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java @@ -38,6 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; /** @@ -201,9 +202,26 @@ public abstract class HAAdmin extends Configured implements Tool { HAServiceTarget fromNode = resolveTarget(args[0]); HAServiceTarget toNode = resolveTarget(args[1]); - if (!checkManualStateManagementOK(fromNode) || - !checkManualStateManagementOK(toNode)) { - return -1; + // Check that auto-failover is consistently configured for both nodes. + Preconditions.checkState( + fromNode.isAutoFailoverEnabled() == + toNode.isAutoFailoverEnabled(), + "Inconsistent auto-failover configs between %s and %s!", + fromNode, toNode); + + if (fromNode.isAutoFailoverEnabled()) { + if (forceFence || forceActive) { + // -forceActive doesn't make sense with auto-HA, since, if the node + // is not healthy, then its ZKFC will immediately quit the election + // again the next time a health check runs. + // + // -forceFence doesn't seem to have any real use cases with auto-HA + // so it isn't implemented. + errOut.println(FORCEFENCE + " and " + FORCEACTIVE + " flags not " + + "supported with auto-failover enabled."); + return -1; + } + return gracefulFailoverThroughZKFCs(toNode); } FailoverController fc = new FailoverController(getConf(), @@ -218,6 +236,31 @@ public abstract class HAAdmin extends Configured implements Tool { } return 0; } + + + /** + * Initiate a graceful failover by talking to the target node's ZKFC. + * This sends an RPC to the ZKFC, which coordinates the failover. + * + * @param toNode the node to fail to + * @return status code (0 for success) + * @throws IOException if failover does not succeed + */ + private int gracefulFailoverThroughZKFCs(HAServiceTarget toNode) + throws IOException { + + int timeout = FailoverController.getRpcTimeoutToNewActive(getConf()); + ZKFCProtocol proxy = toNode.getZKFCProxy(getConf(), timeout); + try { + proxy.gracefulFailover(); + out.println("Failover to " + toNode + " successful"); + } catch (ServiceFailedException sfe) { + errOut.println("Failover failed: " + sfe.getLocalizedMessage()); + return -1; + } + + return 0; + } private int checkHealth(final CommandLine cmd) throws IOException, ServiceFailedException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java index e46db94f036..56678b427e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; +import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB; import org.apache.hadoop.net.NetUtils; import com.google.common.collect.Maps; @@ -48,6 +49,11 @@ public abstract class HAServiceTarget { */ public abstract InetSocketAddress getAddress(); + /** + * @return the IPC address of the ZKFC on the target node + */ + public abstract InetSocketAddress getZKFCAddress(); + /** * @return a Fencer implementation configured for this target node */ @@ -76,6 +82,20 @@ public abstract class HAServiceTarget { confCopy, factory, timeoutMs); } + /** + * @return a proxy to the ZKFC which is associated with this HA service. + */ + public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs) + throws IOException { + Configuration confCopy = new Configuration(conf); + // Lower the timeout so we quickly fail to connect + confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy); + return new ZKFCProtocolClientSideTranslatorPB( + getZKFCAddress(), + confCopy, factory, timeoutMs); + } + public final Map getFencingParameters() { Map ret = Maps.newHashMap(); addFencingParameters(ret); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java new file mode 100644 index 00000000000..02342f4084c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.retry.Idempotent; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.KerberosInfo; + +import java.io.IOException; + +/** + * Protocol exposed by the ZKFailoverController, allowing for graceful + * failover. + */ +@KerberosInfo( + serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY) +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface ZKFCProtocol { + /** + * Initial version of the protocol + */ + public static final long versionID = 1L; + + /** + * Request that this service yield from the active node election for the + * specified time period. + * + * If the node is not currently active, it simply prevents any attempts + * to become active for the specified time period. Otherwise, it first + * tries to transition the local service to standby state, and then quits + * the election. + * + * If the attempt to transition to standby succeeds, then the ZKFC receiving + * this RPC will delete its own breadcrumb node in ZooKeeper. Thus, the + * next node to become active will not run any fencing process. Otherwise, + * the breadcrumb will be left, such that the next active will fence this + * node. + * + * After the specified time period elapses, the node will attempt to re-join + * the election, provided that its service is healthy. + * + * If the node has previously been instructed to cede active, and is still + * within the specified time period, the later command's time period will + * take precedence, resetting the timer. + * + * A call to cedeActive which specifies a 0 or negative time period will + * allow the target node to immediately rejoin the election, so long as + * it is healthy. + * + * @param millisToCede period for which the node should not attempt to + * become active + * @throws IOException if the operation fails + * @throws AccessControlException if the operation is disallowed + */ + @Idempotent + public void cedeActive(int millisToCede) + throws IOException, AccessControlException; + + /** + * Request that this node try to become active through a graceful failover. + * + * If the node is already active, this is a no-op and simply returns success + * without taking any further action. + * + * If the node is not healthy, it will throw an exception indicating that it + * is not able to become active. + * + * If the node is healthy and not active, it will try to initiate a graceful + * failover to become active, returning only when it has successfully become + * active. See {@link ZKFailoverController#gracefulFailoverToYou()} for the + * implementation details. + * + * If the node fails to successfully coordinate the failover, throws an + * exception indicating the reason for failure. + * + * @throws IOException if graceful failover fails + * @throws AccessControlException if the operation is disallowed + */ + @Idempotent + public void gracefulFailover() + throws IOException, AccessControlException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java new file mode 100644 index 00000000000..2077a86a5db --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService; +import org.apache.hadoop.ha.protocolPB.ZKFCProtocolPB; +import org.apache.hadoop.ha.protocolPB.ZKFCProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.authorize.PolicyProvider; + +import com.google.protobuf.BlockingService; + +@InterfaceAudience.LimitedPrivate("HDFS") +@InterfaceStability.Evolving +public class ZKFCRpcServer implements ZKFCProtocol { + + private static final int HANDLER_COUNT = 3; + private final ZKFailoverController zkfc; + private Server server; + + ZKFCRpcServer(Configuration conf, + InetSocketAddress bindAddr, + ZKFailoverController zkfc, + PolicyProvider policy) throws IOException { + this.zkfc = zkfc; + + RPC.setProtocolEngine(conf, ZKFCProtocolPB.class, + ProtobufRpcEngine.class); + ZKFCProtocolServerSideTranslatorPB translator = + new ZKFCProtocolServerSideTranslatorPB(this); + BlockingService service = ZKFCProtocolService + .newReflectiveBlockingService(translator); + this.server = RPC.getServer( + ZKFCProtocolPB.class, + service, bindAddr.getHostName(), + bindAddr.getPort(), HANDLER_COUNT, false, conf, + null /*secretManager*/); + + // set service-level authorization security policy + if (conf.getBoolean( + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { + server.refreshServiceAcl(conf, policy); + } + + } + + void start() { + this.server.start(); + } + + public InetSocketAddress getAddress() { + return server.getListenerAddress(); + } + + void stopAndJoin() throws InterruptedException { + this.server.stop(); + this.server.join(); + } + + @Override + public void cedeActive(int millisToCede) throws IOException, + AccessControlException { + zkfc.checkRpcAdminAccess(); + zkfc.cedeActive(millisToCede); + } + + @Override + public void gracefulFailover() throws IOException, AccessControlException { + zkfc.checkRpcAdminAccess(); + zkfc.gracefulFailoverToYou(); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index f87813e63f5..115eb2c726e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -18,21 +18,32 @@ package org.apache.hadoop.ha; import java.io.IOException; +import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo; import org.apache.hadoop.ha.HealthMonitor.State; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; @@ -41,6 +52,8 @@ import org.apache.zookeeper.data.ACL; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; @InterfaceAudience.LimitedPrivate("HDFS") public abstract class ZKFailoverController implements Tool { @@ -85,6 +98,7 @@ public abstract class ZKFailoverController implements Tool { private HealthMonitor healthMonitor; private ActiveStandbyElector elector; + protected ZKFCRpcServer rpcServer; private HAServiceTarget localTarget; @@ -93,6 +107,22 @@ public abstract class ZKFailoverController implements Tool { /** Set if a fatal error occurs */ private String fatalError = null; + /** + * A future nanotime before which the ZKFC will not join the election. + * This is used during graceful failover. + */ + private long delayJoiningUntilNanotime = 0; + + /** Executor on which {@link #scheduleRecheck(long)} schedules events */ + private ScheduledExecutorService delayExecutor = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ZKFC Delay timer #%d") + .build()); + + private ActiveAttemptRecord lastActiveAttemptRecord; + private Object activeAttemptRecordLock = new Object(); + @Override public void setConf(Configuration conf) { this.conf = conf; @@ -104,6 +134,10 @@ public abstract class ZKFailoverController implements Tool { protected abstract HAServiceTarget getLocalTarget(); protected abstract HAServiceTarget dataToTarget(byte[] data); protected abstract void loginAsFCUser() throws IOException; + protected abstract void checkRpcAdminAccess() + throws AccessControlException, IOException; + protected abstract InetSocketAddress getRpcAddressToBindTo(); + protected abstract PolicyProvider getPolicyProvider(); /** * Return the name of a znode inside the configured parent znode in which @@ -194,10 +228,14 @@ public abstract class ZKFailoverController implements Tool { return ERR_CODE_NO_FENCER; } + initRPC(); initHM(); + startRPC(); try { mainLoop(); } finally { + rpcServer.stopAndJoin(); + elector.quitElection(true); healthMonitor.shutdown(); healthMonitor.join(); @@ -262,6 +300,16 @@ public abstract class ZKFailoverController implements Tool { healthMonitor.addCallback(new HealthCallbacks()); healthMonitor.start(); } + + protected void initRPC() throws IOException { + InetSocketAddress bindAddr = getRpcAddressToBindTo(); + rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider()); + } + + protected void startRPC() throws IOException { + rpcServer.start(); + } + private void initZK() throws HadoopIllegalArgumentException, IOException { zkQuorum = conf.get(ZK_QUORUM_KEY); @@ -328,10 +376,18 @@ public abstract class ZKFailoverController implements Tool { HAServiceProtocolHelper.transitionToActive(localTarget.getProxy( conf, FailoverController.getRpcTimeoutToNewActive(conf)), createReqInfo()); - LOG.info("Successfully transitioned " + localTarget + - " to active state"); + String msg = "Successfully transitioned " + localTarget + + " to active state"; + LOG.info(msg); + recordActiveAttempt(new ActiveAttemptRecord(true, msg)); + } catch (Throwable t) { - LOG.fatal("Couldn't make " + localTarget + " active", t); + String msg = "Couldn't make " + localTarget + " active"; + LOG.fatal(msg, t); + + recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" + + StringUtils.stringifyException(t))); + if (t instanceof ServiceFailedException) { throw (ServiceFailedException)t; } else { @@ -350,6 +406,69 @@ public abstract class ZKFailoverController implements Tool { } } + /** + * Store the results of the last attempt to become active. + * This is used so that, during manually initiated failover, + * we can report back the results of the attempt to become active + * to the initiator of the failover. + */ + private void recordActiveAttempt( + ActiveAttemptRecord record) { + synchronized (activeAttemptRecordLock) { + lastActiveAttemptRecord = record; + activeAttemptRecordLock.notifyAll(); + } + } + + /** + * Wait until one of the following events: + *
    + *
  • Another thread publishes the results of an attempt to become active + * using {@link #recordActiveAttempt(ActiveAttemptRecord)}
  • + *
  • The node enters bad health status
  • + *
  • The specified timeout elapses
  • + *
+ * + * @param timeoutMillis number of millis to wait + * @return the published record, or null if the timeout elapses or the + * service becomes unhealthy + * @throws InterruptedException if the thread is interrupted. + */ + private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis) + throws InterruptedException { + long st = System.nanoTime(); + long waitUntil = st + TimeUnit.NANOSECONDS.convert( + timeoutMillis, TimeUnit.MILLISECONDS); + + do { + // periodically check health state, because entering an + // unhealthy state could prevent us from ever attempting to + // become active. We can detect this and respond to the user + // immediately. + synchronized (this) { + if (lastHealthState != State.SERVICE_HEALTHY) { + // early out if service became unhealthy + return null; + } + } + + synchronized (activeAttemptRecordLock) { + if ((lastActiveAttemptRecord != null && + lastActiveAttemptRecord.nanoTime >= st)) { + return lastActiveAttemptRecord; + } + // Only wait 1sec so that we periodically recheck the health state + // above. + activeAttemptRecordLock.wait(1000); + } + } while (System.nanoTime() < waitUntil); + + // Timeout elapsed. + LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " + + "to become active"); + return null; + } + private StateChangeRequestInfo createReqInfo() { return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC); } @@ -369,6 +488,304 @@ public abstract class ZKFailoverController implements Tool { // at the same time. } } + + + private synchronized void fenceOldActive(byte[] data) { + HAServiceTarget target = dataToTarget(data); + + try { + doFence(target); + } catch (Throwable t) { + recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t))); + Throwables.propagate(t); + } + } + + private void doFence(HAServiceTarget target) { + LOG.info("Should fence: " + target); + boolean gracefulWorked = new FailoverController(conf, + RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target); + if (gracefulWorked) { + // It's possible that it's in standby but just about to go into active, + // no? Is there some race here? + LOG.info("Successfully transitioned " + target + " to standby " + + "state without fencing"); + return; + } + + try { + target.checkFencingConfigured(); + } catch (BadFencingConfigurationException e) { + LOG.error("Couldn't fence old active " + target, e); + recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active")); + throw new RuntimeException(e); + } + + if (!target.getFencer().fence(target)) { + throw new RuntimeException("Unable to fence " + target); + } + } + + + /** + * Request from graceful failover to cede active role. Causes + * this ZKFC to transition its local node to standby, then quit + * the election for the specified period of time, after which it + * will rejoin iff it is healthy. + */ + void cedeActive(final int millisToCede) + throws AccessControlException, ServiceFailedException, IOException { + try { + UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doCedeActive(millisToCede); + return null; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private void doCedeActive(int millisToCede) + throws AccessControlException, ServiceFailedException, IOException { + int timeout = FailoverController.getGracefulFenceTimeout(conf); + + // Lock elector to maintain lock ordering of elector -> ZKFC + synchronized (elector) { + synchronized (this) { + if (millisToCede <= 0) { + delayJoiningUntilNanotime = 0; + recheckElectability(); + return; + } + + LOG.info("Requested by " + UserGroupInformation.getCurrentUser() + + " at " + Server.getRemoteAddress() + " to cede active role."); + boolean needFence = false; + try { + localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo()); + LOG.info("Successfully ensured local node is in standby mode"); + } catch (IOException ioe) { + LOG.warn("Unable to transition local node to standby: " + + ioe.getLocalizedMessage()); + LOG.warn("Quitting election but indicating that fencing is " + + "necessary"); + needFence = true; + } + delayJoiningUntilNanotime = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(millisToCede); + elector.quitElection(needFence); + } + } + recheckElectability(); + } + + /** + * Coordinate a graceful failover to this node. + * @throws ServiceFailedException if the node fails to become active + * @throws IOException some other error occurs + */ + void gracefulFailoverToYou() throws ServiceFailedException, IOException { + try { + UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doGracefulFailover(); + return null; + } + + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Coordinate a graceful failover. This proceeds in several phases: + * 1) Pre-flight checks: ensure that the local node is healthy, and + * thus a candidate for failover. + * 2) Determine the current active node. If it is the local node, no + * need to failover - return success. + * 3) Ask that node to yield from the election for a number of seconds. + * 4) Allow the normal election path to run in other threads. Wait until + * we either become unhealthy or we see an election attempt recorded by + * the normal code path. + * 5) Allow the old active to rejoin the election, so a future + * failback is possible. + */ + private void doGracefulFailover() + throws ServiceFailedException, IOException, InterruptedException { + int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2; + + // Phase 1: pre-flight checks + checkEligibleForFailover(); + + // Phase 2: determine old/current active node. Check that we're not + // ourselves active, etc. + HAServiceTarget oldActive = getCurrentActive(); + if (oldActive == null) { + // No node is currently active. So, if we aren't already + // active ourselves by means of a normal election, then there's + // probably something preventing us from becoming active. + throw new ServiceFailedException( + "No other node is currently active."); + } + + if (oldActive.getAddress().equals(localTarget.getAddress())) { + LOG.info("Local node " + localTarget + " is already active. " + + "No need to failover. Returning success."); + return; + } + + // Phase 3: ask the old active to yield from the election. + LOG.info("Asking " + oldActive + " to cede its active state for " + + timeout + "ms"); + ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout); + oldZkfc.cedeActive(timeout); + + // Phase 4: wait for the normal election to make the local node + // active. + ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000); + + if (attempt == null) { + // We didn't even make an attempt to become active. + synchronized(this) { + if (lastHealthState != State.SERVICE_HEALTHY) { + throw new ServiceFailedException("Unable to become active. " + + "Service became unhealthy while trying to failover."); + } + } + + throw new ServiceFailedException("Unable to become active. " + + "Local node did not get an opportunity to do so from ZooKeeper, " + + "or the local node took too long to transition to active."); + } + + // Phase 5. At this point, we made some attempt to become active. So we + // can tell the old active to rejoin if it wants. This allows a quick + // fail-back if we immediately crash. + oldZkfc.cedeActive(-1); + + if (attempt.succeeded) { + LOG.info("Successfully became active. " + attempt.status); + } else { + // Propagate failure + String msg = "Failed to become active. " + attempt.status; + throw new ServiceFailedException(msg); + } + } + + /** + * Ensure that the local node is in a healthy state, and thus + * eligible for graceful failover. + * @throws ServiceFailedException if the node is unhealthy + */ + private synchronized void checkEligibleForFailover() + throws ServiceFailedException { + // Check health + if (this.getLastHealthState() != State.SERVICE_HEALTHY) { + throw new ServiceFailedException( + localTarget + " is not currently healthy. " + + "Cannot be failover target"); + } + } + + /** + * @return an {@link HAServiceTarget} for the current active node + * in the cluster, or null if no node is active. + * @throws IOException if a ZK-related issue occurs + * @throws InterruptedException if thread is interrupted + */ + private HAServiceTarget getCurrentActive() + throws IOException, InterruptedException { + synchronized (elector) { + synchronized (this) { + byte[] activeData; + try { + activeData = elector.getActiveData(); + } catch (ActiveNotFoundException e) { + return null; + } catch (KeeperException ke) { + throw new IOException( + "Unexpected ZooKeeper issue fetching active node info", ke); + } + + HAServiceTarget oldActive = dataToTarget(activeData); + return oldActive; + } + } + } + + /** + * Check the current state of the service, and join the election + * if it should be in the election. + */ + private void recheckElectability() { + // Maintain lock ordering of elector -> ZKFC + synchronized (elector) { + synchronized (this) { + boolean healthy = lastHealthState == State.SERVICE_HEALTHY; + + long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); + if (remainingDelay > 0) { + if (healthy) { + LOG.info("Would have joined master election, but this node is " + + "prohibited from doing so for " + + TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms"); + } + scheduleRecheck(remainingDelay); + return; + } + + switch (lastHealthState) { + case SERVICE_HEALTHY: + elector.joinElection(targetToData(localTarget)); + break; + + case INITIALIZING: + LOG.info("Ensuring that " + localTarget + " does not " + + "participate in active master election"); + elector.quitElection(false); + break; + + case SERVICE_UNHEALTHY: + case SERVICE_NOT_RESPONDING: + LOG.info("Quitting master election for " + localTarget + + " and marking that fencing is necessary"); + elector.quitElection(true); + break; + + case HEALTH_MONITOR_FAILED: + fatalError("Health monitor failed!"); + break; + + default: + throw new IllegalArgumentException("Unhandled state:" + lastHealthState); + } + } + } + } + + /** + * Schedule a call to {@link #recheckElectability()} in the future. + */ + private void scheduleRecheck(long whenNanos) { + delayExecutor.schedule( + new Runnable() { + @Override + public void run() { + try { + recheckElectability(); + } catch (Throwable t) { + fatalError("Failed to recheck electability: " + + StringUtils.stringifyException(t)); + } + } + }, + whenNanos, TimeUnit.NANOSECONDS); + } /** * @return the last health state passed to the FC @@ -383,6 +800,11 @@ public abstract class ZKFailoverController implements Tool { ActiveStandbyElector getElectorForTests() { return elector; } + + @VisibleForTesting + ZKFCRpcServer getRpcServerForTests() { + return rpcServer; + } /** * Callbacks from elector @@ -409,32 +831,7 @@ public abstract class ZKFailoverController implements Tool { @Override public void fenceOldActive(byte[] data) { - HAServiceTarget target = dataToTarget(data); - - LOG.info("Should fence: " + target); - boolean gracefulWorked = new FailoverController(conf, - RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target); - if (gracefulWorked) { - // It's possible that it's in standby but just about to go into active, - // no? Is there some race here? - LOG.info("Successfully transitioned " + target + " to standby " + - "state without fencing"); - return; - } - - try { - target.checkFencingConfigured(); - } catch (BadFencingConfigurationException e) { - LOG.error("Couldn't fence old active " + target, e); - // TODO: see below todo - throw new RuntimeException(e); - } - - if (!target.getFencer().fence(target)) { - // TODO: this will end up in some kind of tight loop, - // won't it? We need some kind of backoff - throw new RuntimeException("Unable to fence " + target); - } + ZKFailoverController.this.fenceOldActive(data); } @Override @@ -451,34 +848,21 @@ public abstract class ZKFailoverController implements Tool { public void enteredState(HealthMonitor.State newState) { LOG.info("Local service " + localTarget + " entered state: " + newState); - switch (newState) { - case SERVICE_HEALTHY: - LOG.info("Joining master election for " + localTarget); - elector.joinElection(targetToData(localTarget)); - break; - - case INITIALIZING: - LOG.info("Ensuring that " + localTarget + " does not " + - "participate in active master election"); - elector.quitElection(false); - break; - - case SERVICE_UNHEALTHY: - case SERVICE_NOT_RESPONDING: - LOG.info("Quitting master election for " + localTarget + - " and marking that fencing is necessary"); - elector.quitElection(true); - break; - - case HEALTH_MONITOR_FAILED: - fatalError("Health monitor failed!"); - break; - - default: - throw new IllegalArgumentException("Unhandled state:" + newState); - } - lastHealthState = newState; + recheckElectability(); } } + + private static class ActiveAttemptRecord { + private final boolean succeeded; + private final String status; + private final long nanoTime; + + public ActiveAttemptRecord(boolean succeeded, String status) { + this.succeeded = succeeded; + this.status = status; + this.nanoTime = System.nanoTime(); + } + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java new file mode 100644 index 00000000000..62896fa8e74 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import javax.net.SocketFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ZKFCProtocol; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + + +public class ZKFCProtocolClientSideTranslatorPB implements + ZKFCProtocol, Closeable, ProtocolTranslator { + + private final static RpcController NULL_CONTROLLER = null; + private final ZKFCProtocolPB rpcProxy; + + public ZKFCProtocolClientSideTranslatorPB( + InetSocketAddress addr, Configuration conf, + SocketFactory socketFactory, int timeout) throws IOException { + RPC.setProtocolEngine(conf, ZKFCProtocolPB.class, + ProtobufRpcEngine.class); + rpcProxy = RPC.getProxy(ZKFCProtocolPB.class, + RPC.getProtocolVersion(ZKFCProtocolPB.class), addr, + UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout); + } + + @Override + public void cedeActive(int millisToCede) throws IOException, + AccessControlException { + try { + CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder() + .setMillisToCede(millisToCede) + .build(); + rpcProxy.cedeActive(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void gracefulFailover() throws IOException, AccessControlException { + try { + rpcProxy.gracefulFailover(NULL_CONTROLLER, + GracefulFailoverRequestProto.getDefaultInstance()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java new file mode 100644 index 00000000000..348004fc045 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; + +@KerberosInfo( + serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY) +@ProtocolInfo(protocolName = "org.apache.hadoop.ha.ZKFCProtocol", + protocolVersion = 1) +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ZKFCProtocolPB extends + ZKFCProtocolService.BlockingInterface, VersionedProtocol { + /** + * If any methods need annotation, it can be added here + */ +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java new file mode 100644 index 00000000000..549499885df --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.ZKFCProtocol; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveResponseProto; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto; +import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverResponseProto; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ZKFCProtocolServerSideTranslatorPB implements + ZKFCProtocolPB { + private final ZKFCProtocol server; + + public ZKFCProtocolServerSideTranslatorPB(ZKFCProtocol server) { + this.server = server; + } + + @Override + public CedeActiveResponseProto cedeActive(RpcController controller, + CedeActiveRequestProto request) throws ServiceException { + try { + server.cedeActive(request.getMillisToCede()); + return CedeActiveResponseProto.getDefaultInstance(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GracefulFailoverResponseProto gracefulFailover( + RpcController controller, GracefulFailoverRequestProto request) + throws ServiceException { + try { + server.gracefulFailover(); + return GracefulFailoverResponseProto.getDefaultInstance(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(ZKFCProtocolPB.class); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + if (!protocol.equals(RPC.getProtocolName(ZKFCProtocolPB.class))) { + throw new IOException("Serverside implements " + + RPC.getProtocolName(ZKFCProtocolPB.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(ZKFCProtocolPB.class), + HAServiceProtocolPB.class); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml b/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml index 2fd9f8d2a98..131fecfc007 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml +++ b/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml @@ -223,6 +223,12 @@ ACL for HAService protocol used by HAAdmin to manage the active and stand-by states of namenode. + + security.zkfc.protocol.acl + * + ACL for access to the ZK Failover Controller + + security.mrhs.client.protocol.acl diff --git a/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto b/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto new file mode 100644 index 00000000000..1037b028ce6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.ha.proto"; +option java_outer_classname = "ZKFCProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +message CedeActiveRequestProto { + required uint32 millisToCede = 1; +} + +message CedeActiveResponseProto { +} + +message GracefulFailoverRequestProto { +} + +message GracefulFailoverResponseProto { +} + + +/** + * Protocol provides manual control of the ZK Failover Controllers + */ +service ZKFCProtocolService { + /** + * Request that the service cede its active state, and quit the election + * for some amount of time + */ + rpc cedeActive(CedeActiveRequestProto) + returns(CedeActiveResponseProto); + + + rpc gracefulFailover(GracefulFailoverRequestProto) + returns(GracefulFailoverResponseProto); +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index c3ff1cf6245..c38bc534245 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -40,13 +40,15 @@ class DummyHAService extends HAServiceTarget { private static final String DUMMY_FENCE_KEY = "dummy.fence.key"; volatile HAServiceState state; HAServiceProtocol proxy; + ZKFCProtocol zkfcProxy = null; NodeFencer fencer; InetSocketAddress address; boolean isHealthy = true; boolean actUnreachable = false; - boolean failToBecomeActive; + boolean failToBecomeActive, failToBecomeStandby, failToFence; DummySharedResource sharedResource; + public int fenceCount = 0; static ArrayList instances = Lists.newArrayList(); int index; @@ -82,12 +84,24 @@ class DummyHAService extends HAServiceTarget { return address; } + @Override + public InetSocketAddress getZKFCAddress() { + return null; + } + @Override public HAServiceProtocol getProxy(Configuration conf, int timeout) throws IOException { return proxy; } + @Override + public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout) + throws IOException { + assert zkfcProxy != null; + return zkfcProxy; + } + @Override public NodeFencer getFencer() { return fencer; @@ -139,6 +153,9 @@ class DummyHAService extends HAServiceTarget { public void transitionToStandby(StateChangeRequestInfo req) throws ServiceFailedException, AccessControlException, IOException { checkUnreachable(); + if (failToBecomeStandby) { + throw new ServiceFailedException("injected failure"); + } if (sharedResource != null) { sharedResource.release(DummyHAService.this); } @@ -167,7 +184,6 @@ class DummyHAService extends HAServiceTarget { } public static class DummyFencer implements FenceMethod { - public void checkArgs(String args) throws BadFencingConfigurationException { } @@ -176,6 +192,13 @@ class DummyHAService extends HAServiceTarget { throws BadFencingConfigurationException { LOG.info("tryFence(" + target + ")"); DummyHAService svc = (DummyHAService)target; + synchronized (svc) { + svc.fenceCount++; + } + if (svc.failToFence) { + LOG.info("Injected failure to fence"); + return false; + } svc.sharedResource.release(svc); return true; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java index be72a410d58..7ebca62860c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java @@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HealthMonitor.State; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -126,6 +128,10 @@ public class MiniZKFCCluster { public ActiveStandbyElector getElector(int i) { return thrs[i].zkfc.getElectorForTests(); } + + public DummyZKFC getZkfc(int i) { + return thrs[i].zkfc; + } public void setHealthy(int idx, boolean healthy) { svcs[idx].isHealthy = healthy; @@ -134,6 +140,14 @@ public class MiniZKFCCluster { public void setFailToBecomeActive(int idx, boolean doFail) { svcs[idx].failToBecomeActive = doFail; } + + public void setFailToBecomeStandby(int idx, boolean doFail) { + svcs[idx].failToBecomeStandby = doFail; + } + + public void setFailToFence(int idx, boolean doFail) { + svcs[idx].failToFence = doFail; + } public void setUnreachable(int idx, boolean unreachable) { svcs[idx].actUnreachable = unreachable; @@ -290,5 +304,25 @@ public class MiniZKFCCluster { protected String getScopeInsideParentNode() { return DUMMY_CLUSTER; } + + @Override + protected void checkRpcAdminAccess() throws AccessControlException { + } + + @Override + protected InetSocketAddress getRpcAddressToBindTo() { + return new InetSocketAddress(0); + } + + @Override + protected void initRPC() throws IOException { + super.initRPC(); + localTarget.zkfcProxy = this.getRpcServerForTests(); + } + + @Override + protected PolicyProvider getPolicyProvider() { + return null; + } } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java index c010c3d3b0f..ffce17e666f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HealthMonitor.State; import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; @@ -378,6 +379,205 @@ public class TestZKFailoverController extends ClientBaseWithFixes { cluster.stop(); } } + + /** + * Test that the ZKFC can gracefully cede its active status. + */ + @Test(timeout=15000) + public void testCedeActive() throws Exception { + try { + cluster.start(); + DummyZKFC zkfc = cluster.getZkfc(0); + // It should be in active to start. + assertEquals(ActiveStandbyElector.State.ACTIVE, + zkfc.getElectorForTests().getStateForTests()); + + // Ask it to cede active for 3 seconds. It should respond promptly + // (i.e. the RPC itself should not take 3 seconds!) + ZKFCProtocol proxy = zkfc.getLocalTarget().getZKFCProxy(conf, 5000); + long st = System.currentTimeMillis(); + proxy.cedeActive(3000); + long et = System.currentTimeMillis(); + assertTrue("RPC to cedeActive took " + (et - st) + " ms", + et - st < 1000); + + // Should be in "INIT" state since it's not in the election + // at this point. + assertEquals(ActiveStandbyElector.State.INIT, + zkfc.getElectorForTests().getStateForTests()); + + // After the prescribed 3 seconds, should go into STANDBY state, + // since the other node in the cluster would have taken ACTIVE. + cluster.waitForElectorState(0, ActiveStandbyElector.State.STANDBY); + long et2 = System.currentTimeMillis(); + assertTrue("Should take ~3 seconds to rejoin. Only took " + (et2 - et) + + "ms before rejoining.", + et2 - et > 2800); + } finally { + cluster.stop(); + } + } + + @Test(timeout=15000) + public void testGracefulFailover() throws Exception { + try { + cluster.start(); + + cluster.waitForActiveLockHolder(0); + cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover(); + cluster.waitForActiveLockHolder(1); + cluster.getService(0).getZKFCProxy(conf, 5000).gracefulFailover(); + cluster.waitForActiveLockHolder(0); + + assertEquals(0, cluster.getService(0).fenceCount); + assertEquals(0, cluster.getService(1).fenceCount); + } finally { + cluster.stop(); + } + } + + @Test(timeout=15000) + public void testGracefulFailoverToUnhealthy() throws Exception { + try { + cluster.start(); + + cluster.waitForActiveLockHolder(0); + + // Mark it unhealthy, wait for it to exit election + cluster.setHealthy(1, false); + cluster.waitForElectorState(1, ActiveStandbyElector.State.INIT); + + // Ask for failover, it should fail, because it's unhealthy + try { + cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover(); + fail("Did not fail to graceful failover to unhealthy service!"); + } catch (ServiceFailedException sfe) { + GenericTestUtils.assertExceptionContains( + cluster.getService(1).toString() + + " is not currently healthy.", sfe); + } + } finally { + cluster.stop(); + } + } + + @Test(timeout=15000) + public void testGracefulFailoverFailBecomingActive() throws Exception { + try { + cluster.start(); + + cluster.waitForActiveLockHolder(0); + cluster.setFailToBecomeActive(1, true); + + // Ask for failover, it should fail and report back to user. + try { + cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover(); + fail("Did not fail to graceful failover when target failed " + + "to become active!"); + } catch (ServiceFailedException sfe) { + GenericTestUtils.assertExceptionContains( + "Couldn't make " + cluster.getService(1) + " active", sfe); + GenericTestUtils.assertExceptionContains( + "injected failure", sfe); + } + + // No fencing + assertEquals(0, cluster.getService(0).fenceCount); + assertEquals(0, cluster.getService(1).fenceCount); + + // Service 0 should go back to being active after the failed failover + cluster.waitForActiveLockHolder(0); + } finally { + cluster.stop(); + } + } + + @Test(timeout=15000) + public void testGracefulFailoverFailBecomingStandby() throws Exception { + try { + cluster.start(); + + cluster.waitForActiveLockHolder(0); + + // Ask for failover when old node fails to transition to standby. + // This should trigger fencing, since the cedeActive() command + // still works, but leaves the breadcrumb in place. + cluster.setFailToBecomeStandby(0, true); + cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover(); + + // Check that the old node was fenced + assertEquals(1, cluster.getService(0).fenceCount); + } finally { + cluster.stop(); + } + } + + @Test(timeout=15000) + public void testGracefulFailoverFailBecomingStandbyAndFailFence() + throws Exception { + try { + cluster.start(); + + cluster.waitForActiveLockHolder(0); + + // Ask for failover when old node fails to transition to standby. + // This should trigger fencing, since the cedeActive() command + // still works, but leaves the breadcrumb in place. + cluster.setFailToBecomeStandby(0, true); + cluster.setFailToFence(0, true); + + try { + cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover(); + fail("Failover should have failed when old node wont fence"); + } catch (ServiceFailedException sfe) { + GenericTestUtils.assertExceptionContains( + "Unable to fence " + cluster.getService(0), sfe); + } + } finally { + cluster.stop(); + } + } + + /** + * Test which exercises all of the inputs into ZKFC. This is particularly + * useful for running under jcarder to check for lock order violations. + */ + @Test(timeout=30000) + public void testOneOfEverything() throws Exception { + try { + cluster.start(); + + // Failover by session expiration + LOG.info("====== Failing over by session expiration"); + cluster.expireAndVerifyFailover(0, 1); + cluster.expireAndVerifyFailover(1, 0); + + // Restart ZK + LOG.info("====== Restarting server"); + stopServer(); + waitForServerDown(hostPort, CONNECTION_TIMEOUT); + startServer(); + waitForServerUp(hostPort, CONNECTION_TIMEOUT); + + // Failover by bad health + cluster.setHealthy(0, false); + cluster.waitForHAState(0, HAServiceState.STANDBY); + cluster.waitForHAState(1, HAServiceState.ACTIVE); + cluster.setHealthy(1, true); + cluster.setHealthy(0, false); + cluster.waitForHAState(1, HAServiceState.ACTIVE); + cluster.waitForHAState(0, HAServiceState.STANDBY); + cluster.setHealthy(0, true); + + cluster.waitForHealthState(0, State.SERVICE_HEALTHY); + + // Graceful failovers + cluster.getZkfc(1).gracefulFailoverToYou(); + cluster.getZkfc(0).gracefulFailoverToYou(); + } finally { + cluster.stop(); + } + } private int runFC(DummyHAService target, String ... args) throws Exception { DummyZKFC zkfc = new DummyZKFC(target); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 83f2855e0f0..7e50f272cd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -352,4 +352,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; public static final String DFS_HA_AUTO_FAILOVER_ENABLED_KEY = "dfs.ha.automatic-failover.enabled"; public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false; + public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port"; + public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java index 6e212458d07..e8e80a824e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.ZKFCProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -47,6 +48,8 @@ public class HDFSPolicyProvider extends PolicyProvider { new Service("security.namenode.protocol.acl", NamenodeProtocol.class), new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL, HAServiceProtocol.class), + new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL, + ZKFCProtocol.class), new Service( CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY, RefreshAuthorizationPolicyProtocol.class), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 3c56bd9af56..c1aff665279 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -181,7 +181,8 @@ public class NameNode { DFS_NAMENODE_BACKUP_ADDRESS_KEY, DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY, - DFS_HA_FENCE_METHODS_KEY + DFS_HA_FENCE_METHODS_KEY, + DFS_HA_ZKFC_PORT_KEY }; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java index 87422a36c7e..28f0e9f4543 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java @@ -30,11 +30,18 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.ha.ZKFailoverController; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -48,6 +55,7 @@ public class DFSZKFailoverController extends ZKFailoverController { LogFactory.getLog(DFSZKFailoverController.class); private NNHAServiceTarget localTarget; private Configuration localNNConf; + private AccessControlList adminAcl; @Override protected HAServiceTarget dataToTarget(byte[] data) { @@ -68,21 +76,43 @@ public class DFSZKFailoverController extends ZKFailoverController { ret + ": Stored protobuf was " + proto + ", address from our own " + "configuration for this NameNode was " + ret.getAddress()); } + + ret.setZkfcPort(proto.getZkfcPort()); return ret; } @Override protected byte[] targetToData(HAServiceTarget target) { InetSocketAddress addr = target.getAddress(); + return ActiveNodeInfo.newBuilder() .setHostname(addr.getHostName()) .setPort(addr.getPort()) + .setZkfcPort(target.getZKFCAddress().getPort()) .setNameserviceId(localTarget.getNameServiceId()) .setNamenodeId(localTarget.getNameNodeId()) .build() .toByteArray(); } + @Override + protected InetSocketAddress getRpcAddressToBindTo() { + int zkfcPort = getZkfcPort(localNNConf); + return new InetSocketAddress(localTarget.getAddress().getAddress(), + zkfcPort); + } + + + @Override + protected PolicyProvider getPolicyProvider() { + return new HDFSPolicyProvider(); + } + + static int getZkfcPort(Configuration conf) { + return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY, + DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT); + } + @Override public void setConf(Configuration conf) { localNNConf = DFSHAAdmin.addSecurityConfiguration(conf); @@ -98,10 +128,21 @@ public class DFSZKFailoverController extends ZKFailoverController { localTarget = new NNHAServiceTarget(localNNConf, nsId, nnId); + // Setup ACLs + adminAcl = new AccessControlList( + conf.get(DFSConfigKeys.DFS_ADMIN, " ")); + super.setConf(localNNConf); LOG.info("Failover controller configured for NameNode " + nsId + "." + nnId); } + + + @Override + protected void initRPC() throws IOException { + super.initRPC(); + localTarget.setZkfcPort(rpcServer.getAddress().getPort()); + } @Override public HAServiceTarget getLocalTarget() { @@ -127,4 +168,19 @@ public class DFSZKFailoverController extends ZKFailoverController { System.exit(ToolRunner.run( new DFSZKFailoverController(), args)); } + + @Override + protected void checkRpcAdminAccess() throws IOException, AccessControlException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + UserGroupInformation zkfcUgi = UserGroupInformation.getLoginUser(); + if (adminAcl.isUserAllowed(ugi) || + ugi.getShortUserName().equals(zkfcUgi.getShortUserName())) { + LOG.info("Allowed RPC access from " + ugi + " at " + Server.getRemoteAddress()); + return; + } + String msg = "Disallowed RPC access from " + ugi + " at " + + Server.getRemoteAddress() + ". Not listed in " + DFSConfigKeys.DFS_ADMIN; + LOG.warn(msg); + throw new AccessControlException(msg); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java index ab76ba0609e..38f5123de26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java @@ -45,12 +45,13 @@ public class NNHAServiceTarget extends HAServiceTarget { private static final String NAMENODE_ID_KEY = "namenodeid"; private final InetSocketAddress addr; + private InetSocketAddress zkfcAddr; private NodeFencer fencer; private BadFencingConfigurationException fenceConfigError; private final String nnId; private final String nsId; private final boolean autoFailoverEnabled; - + public NNHAServiceTarget(Configuration conf, String nsId, String nnId) { Preconditions.checkNotNull(nnId); @@ -77,17 +78,26 @@ public class NNHAServiceTarget extends HAServiceTarget { } this.addr = NetUtils.createSocketAddr(serviceAddr, NameNode.DEFAULT_PORT); + + this.autoFailoverEnabled = targetConf.getBoolean( + DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, + DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT); + if (autoFailoverEnabled) { + int port = DFSZKFailoverController.getZkfcPort(targetConf); + if (port != 0) { + setZkfcPort(port); + } + } + try { this.fencer = NodeFencer.create(targetConf, DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY); } catch (BadFencingConfigurationException e) { this.fenceConfigError = e; } + this.nnId = nnId; this.nsId = nsId; - this.autoFailoverEnabled = targetConf.getBoolean( - DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, - DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT); } /** @@ -98,6 +108,21 @@ public class NNHAServiceTarget extends HAServiceTarget { return addr; } + @Override + public InetSocketAddress getZKFCAddress() { + Preconditions.checkState(autoFailoverEnabled, + "ZKFC address not relevant when auto failover is off"); + assert zkfcAddr != null; + + return zkfcAddr; + } + + void setZkfcPort(int port) { + assert autoFailoverEnabled; + + this.zkfcAddr = new InetSocketAddress(addr.getAddress(), port); + } + @Override public void checkFencingConfigured() throws BadFencingConfigurationException { if (fenceConfigError != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto index 7836dbe5f49..364c5bd2ef4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto @@ -24,4 +24,5 @@ message ActiveNodeInfo { required string hostname = 3; required int32 port = 4; + required int32 zkfcPort = 5; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java index 117c7127edf..bdb8b87ab6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java @@ -22,10 +22,10 @@ import static org.junit.Assert.*; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.ClientBaseWithFixes; -import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.ZKFailoverController; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer; @@ -62,6 +62,15 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes { AlwaysSucceedFencer.class.getName()); conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true); + // Turn off IPC client caching, so that the suite can handle + // the restart of the daemons between test cases. + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + 0); + + conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn1", 10003); + conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2", 10004); + MiniDFSNNTopology topology = new MiniDFSNNTopology() .addNameservice(new MiniDFSNNTopology.NSConf("ns1") .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001)) @@ -100,18 +109,6 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes { } } - /** - * Test that, when automatic failover is enabled, the manual - * failover script refuses to run. - */ - @Test(timeout=10000) - public void testManualFailoverIsDisabled() throws Exception { - DFSHAAdmin admin = new DFSHAAdmin(); - admin.setConf(conf); - int rc = admin.run(new String[]{"-failover", "nn1", "nn2"}); - assertEquals(-1, rc); - } - /** * Test that automatic failover is triggered by shutting the * active NN down. @@ -148,6 +145,29 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes { thr2.zkfc.getLocalTarget().getAddress()); } + @Test(timeout=30000) + public void testManualFailover() throws Exception { + thr2.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover(); + waitForHAState(0, HAServiceState.STANDBY); + waitForHAState(1, HAServiceState.ACTIVE); + + thr1.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover(); + waitForHAState(0, HAServiceState.ACTIVE); + waitForHAState(1, HAServiceState.STANDBY); + } + + @Test(timeout=30000) + public void testManualFailoverWithDFSHAAdmin() throws Exception { + DFSHAAdmin tool = new DFSHAAdmin(); + tool.setConf(conf); + tool.run(new String[]{"-failover", "nn1", "nn2"}); + waitForHAState(0, HAServiceState.STANDBY); + waitForHAState(1, HAServiceState.ACTIVE); + tool.run(new String[]{"-failover", "nn2", "nn1"}); + waitForHAState(0, HAServiceState.ACTIVE); + waitForHAState(1, HAServiceState.STANDBY); + } + private void waitForHAState(int nnidx, final HAServiceState state) throws TimeoutException, InterruptedException { final NameNode nn = cluster.getNameNode(nnidx); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java index 3570e4dbafc..8b0fd0ee768 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java @@ -38,7 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.ha.HealthCheckFailedException; -import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.ha.ZKFCProtocol; import org.apache.hadoop.test.MockitoUtil; import org.junit.Before; @@ -56,6 +56,7 @@ public class TestDFSHAAdmin { private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream(); private String errOutput; private HAServiceProtocol mockProtocol; + private ZKFCProtocol mockZkfcProtocol; private static final String NSID = "ns1"; @@ -88,6 +89,7 @@ public class TestDFSHAAdmin { @Before public void setup() throws IOException { mockProtocol = MockitoUtil.mockProtocol(HAServiceProtocol.class); + mockZkfcProtocol = MockitoUtil.mockProtocol(ZKFCProtocol.class); tool = new DFSHAAdmin() { @Override @@ -97,7 +99,9 @@ public class TestDFSHAAdmin { // OVerride the target to return our mock protocol try { Mockito.doReturn(mockProtocol).when(spy).getProxy( - Mockito.any(), Mockito.anyInt()); + Mockito.any(), Mockito.anyInt()); + Mockito.doReturn(mockZkfcProtocol).when(spy).getZKFCProxy( + Mockito.any(), Mockito.anyInt()); } catch (IOException e) { throw new AssertionError(e); // mock setup doesn't really throw } @@ -172,8 +176,6 @@ public class TestDFSHAAdmin { assertTrue(errOutput.contains("Refusing to manually manage")); assertEquals(-1, runTool("-transitionToStandby", "nn1")); assertTrue(errOutput.contains("Refusing to manually manage")); - assertEquals(-1, runTool("-failover", "nn1", "nn2")); - assertTrue(errOutput.contains("Refusing to manually manage")); Mockito.verify(mockProtocol, Mockito.never()) .transitionToActive(anyReqInfo()); @@ -186,12 +188,10 @@ public class TestDFSHAAdmin { assertEquals(0, runTool("-transitionToActive", "-forcemanual", "nn1")); setupConfirmationOnSystemIn(); assertEquals(0, runTool("-transitionToStandby", "-forcemanual", "nn1")); - setupConfirmationOnSystemIn(); - assertEquals(0, runTool("-failover", "-forcemanual", "nn1", "nn2")); - Mockito.verify(mockProtocol, Mockito.times(2)).transitionToActive( + Mockito.verify(mockProtocol, Mockito.times(1)).transitionToActive( reqInfoCaptor.capture()); - Mockito.verify(mockProtocol, Mockito.times(2)).transitionToStandby( + Mockito.verify(mockProtocol, Mockito.times(1)).transitionToStandby( reqInfoCaptor.capture()); // All of the RPCs should have had the "force" source @@ -300,6 +300,19 @@ public class TestDFSHAAdmin { tool.setConf(conf); assertEquals(-1, runTool("-failover", "nn1", "nn2", "--forcefence")); } + + @Test + public void testFailoverWithAutoHa() throws Exception { + Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus(); + // Turn on auto-HA in the config + HdfsConfiguration conf = getHAConf(); + conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true); + conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY, "shell(true)"); + tool.setConf(conf); + + assertEquals(0, runTool("-failover", "nn1", "nn2")); + Mockito.verify(mockZkfcProtocol).gracefulFailover(); + } @Test public void testForceFenceOptionListedBeforeArgs() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml index eb3f4bd7447..afbf420df0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml @@ -116,5 +116,11 @@ ACL for HAService protocol used by HAAdmin to manage the active and stand-by states of namenode. + + security.zkfc.protocol.acl + * + ACL for access to the ZK Failover Controller + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm index 1cf75cd0f1a..d78b75bcd03 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm @@ -710,6 +710,6 @@ digest:hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=:rwcda * <> - Currently, this facility is not yet implemented. Instead, you may simply stop - the active NameNode daemon. This will trigger an automatic failover. This - process will be improved in future versions. \ No newline at end of file + Even if automatic failover is configured, you may initiate a manual failover + using the same <<>> command. It will perform a coordinated + failover. \ No newline at end of file