diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 14f0c6db07e..c1df0b1efb6 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -476,6 +476,9 @@ Release 2.0.3-alpha - Unreleased HADOOP-6762. Exception while doing RPC I/O closes channel (Sam Rash and todd via todd) + HADOOP-9126. FormatZK and ZKFC startup can fail due to zkclient connection + establishment delay. (Rakesh R and todd via todd) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index 7a2a5f1ebab..bb4d4b5809c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -21,6 +21,8 @@ package org.apache.hadoop.ha; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,6 +47,7 @@ import org.apache.zookeeper.KeeperException.Code; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * @@ -205,7 +208,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { int zookeeperSessionTimeout, String parentZnodeName, List acl, List authInfo, ActiveStandbyElectorCallback app) throws IOException, - HadoopIllegalArgumentException { + HadoopIllegalArgumentException, KeeperException { if (app == null || acl == null || parentZnodeName == null || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) { throw new HadoopIllegalArgumentException("Invalid argument"); @@ -602,10 +605,24 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { * * @return new zookeeper client instance * @throws IOException + * @throws KeeperException zookeeper connectionloss exception */ - protected synchronized ZooKeeper getNewZooKeeper() throws IOException { - ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); - zk.register(new WatcherWithClientRef(zk)); + protected synchronized ZooKeeper getNewZooKeeper() throws IOException, + KeeperException { + + // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and + // may trigger the Connected event immediately. So, if we register the + // watcher after constructing ZooKeeper, we may miss that event. Instead, + // we construct the watcher first, and have it queue any events it receives + // before we can set its ZooKeeper reference. + WatcherWithClientRef watcher = new WatcherWithClientRef(); + ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher); + watcher.setZooKeeperRef(zk); + + // Wait for the asynchronous success/failure. This may throw an exception + // if we don't connect within the session timeout. + watcher.waitForZKConnectionEvent(zkSessionTimeout); + for (ZKAuthInfo auth : zkAuthInfo) { zk.addAuthInfo(auth.getScheme(), auth.getAuth()); } @@ -710,13 +727,16 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { } catch(IOException e) { LOG.warn(e); sleepFor(5000); + } catch(KeeperException e) { + LOG.warn(e); + sleepFor(5000); } ++connectionRetryCount; } return success; } - private void createConnection() throws IOException { + private void createConnection() throws IOException, KeeperException { if (zkClient != null) { try { zkClient.close(); @@ -973,14 +993,76 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { * events. */ private final class WatcherWithClientRef implements Watcher { - private final ZooKeeper zk; + private ZooKeeper zk; + + /** + * Latch fired whenever any event arrives. This is used in order + * to wait for the Connected event when the client is first created. + */ + private CountDownLatch hasReceivedEvent = new CountDownLatch(1); + + /** + * If any events arrive before the reference to ZooKeeper is set, + * they get queued up and later forwarded when the reference is + * available. + */ + private final List queuedEvents = Lists.newLinkedList(); + + private WatcherWithClientRef() { + } private WatcherWithClientRef(ZooKeeper zk) { this.zk = zk; } + + /** + * Waits for the next event from ZooKeeper to arrive. + * + * @param connectionTimeoutMs zookeeper connection timeout in milliseconds + * @throws KeeperException if the connection attempt times out. This will + * be a ZooKeeper ConnectionLoss exception code. + * @throws IOException if interrupted while connecting to ZooKeeper + */ + private void waitForZKConnectionEvent(int connectionTimeoutMs) + throws KeeperException, IOException { + try { + if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) { + LOG.error("Connection timed out: couldn't connect to ZooKeeper in " + + connectionTimeoutMs + " milliseconds"); + synchronized (this) { + zk.close(); + } + throw KeeperException.create(Code.CONNECTIONLOSS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + "Interrupted when connecting to zookeeper server", e); + } + } + + private synchronized void setZooKeeperRef(ZooKeeper zk) { + Preconditions.checkState(this.zk == null, + "zk already set -- must be set exactly once"); + this.zk = zk; + + for (WatchedEvent e : queuedEvents) { + forwardEvent(e); + } + queuedEvents.clear(); + } @Override - public void process(WatchedEvent event) { + public synchronized void process(WatchedEvent event) { + if (zk != null) { + forwardEvent(event); + } else { + queuedEvents.add(event); + } + } + + private void forwardEvent(WatchedEvent event) { + hasReceivedEvent.countDown(); try { ActiveStandbyElector.this.processWatchEvent( zk, event); @@ -1024,5 +1106,4 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) + " cb=" + appClient; } - } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index 35d75b72ae5..712c1d0f182 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -180,7 +180,15 @@ public abstract class ZKFailoverController { private int doRun(String[] args) throws HadoopIllegalArgumentException, IOException, InterruptedException { - initZK(); + try { + initZK(); + } catch (KeeperException ke) { + LOG.fatal("Unable to start failover controller. Unable to connect " + + "to ZooKeeper quorum at " + zkQuorum + ". Please check the " + + "configured value for " + ZK_QUORUM_KEY + " and ensure that " + + "ZooKeeper is running."); + return ERR_CODE_NO_ZK; + } if (args.length > 0) { if ("-formatZK".equals(args[0])) { boolean force = false; @@ -199,24 +207,12 @@ public abstract class ZKFailoverController { badArg(args[0]); } } - - try { - if (!elector.parentZNodeExists()) { - LOG.fatal("Unable to start failover controller. " + - "Parent znode does not exist.\n" + - "Run with -formatZK flag to initialize ZooKeeper."); - return ERR_CODE_NO_PARENT_ZNODE; - } - } catch (IOException ioe) { - if (ioe.getCause() instanceof KeeperException.ConnectionLossException) { - LOG.fatal("Unable to start failover controller. Unable to connect " + - "to ZooKeeper quorum at " + zkQuorum + ". Please check the " + - "configured value for " + ZK_QUORUM_KEY + " and ensure that " + - "ZooKeeper is running."); - return ERR_CODE_NO_ZK; - } else { - throw ioe; - } + + if (!elector.parentZNodeExists()) { + LOG.fatal("Unable to start failover controller. " + + "Parent znode does not exist.\n" + + "Run with -formatZK flag to initialize ZooKeeper."); + return ERR_CODE_NO_PARENT_ZNODE; } try { @@ -310,7 +306,8 @@ public abstract class ZKFailoverController { } - private void initZK() throws HadoopIllegalArgumentException, IOException { + private void initZK() throws HadoopIllegalArgumentException, IOException, + KeeperException { zkQuorum = conf.get(ZK_QUORUM_KEY); int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java index 2eba9671a34..c2dc23abccd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java @@ -42,6 +42,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo; +import org.apache.hadoop.test.GenericTestUtils; public class TestActiveStandbyElector { @@ -56,7 +57,8 @@ public class TestActiveStandbyElector { private int sleptFor = 0; ActiveStandbyElectorTester(String hostPort, int timeout, String parent, - List acl, ActiveStandbyElectorCallback app) throws IOException { + List acl, ActiveStandbyElectorCallback app) throws IOException, + KeeperException { super(hostPort, timeout, parent, acl, Collections.emptyList(), app); } @@ -83,7 +85,7 @@ public class TestActiveStandbyElector { ActiveStandbyElector.BREADCRUMB_FILENAME; @Before - public void init() throws IOException { + public void init() throws IOException, KeeperException { count = 0; mockZK = Mockito.mock(ZooKeeper.class); mockApp = Mockito.mock(ActiveStandbyElectorCallback.class); @@ -705,4 +707,18 @@ public class TestActiveStandbyElector { Mockito.eq(ZK_PARENT_NAME), Mockito.any(), Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT)); } + + /** + * verify the zookeeper connection establishment + */ + @Test + public void testWithoutZKServer() throws Exception { + try { + new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME, + Ids.OPEN_ACL_UNSAFE, Collections. emptyList(), mockApp); + Assert.fail("Did not throw zookeeper connection loss exceptions!"); + } catch (KeeperException ke) { + GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke); + } + } }