diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d4dd07d4360..0f7d180fcde 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -45,6 +45,9 @@ Release 2.6.2 - UNRELEASED YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id has not been reset synchronously. (Jun Gong via rohithsharmaks) + YARN-3798. ZKRMStateStore shouldn't create new session without occurrance of + SESSIONEXPIED. (ozawa and Varun Saxena) + Release 2.6.1 - 2015-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 8abc64ed0dc..0475a68f9fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -28,6 +28,8 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -110,6 +113,20 @@ public class ZKRMStateStore extends RMStateStore { private List zkAcl; private List zkAuths; + class ZKSyncOperationCallback implements AsyncCallback.VoidCallback { + public final CountDownLatch latch = new CountDownLatch(1); + @Override + public void processResult(int rc, String path, Object ctx){ + if (rc == Code.OK.intValue()) { + LOG.info("ZooKeeper sync operation succeeded. path: " + path); + latch.countDown(); + } else { + LOG.fatal("ZooKeeper sync operation failed. Waiting for session " + + "timeout. path: " + path); + } + } + } + /** * * ROOT_DIR_PATH @@ -295,6 +312,7 @@ public class ZKRMStateStore extends RMStateStore { createRootDir(delegationTokensRootPath); createRootDir(dtSequenceNumberPath); createRootDir(amrmTokenSecretManagerRoot); + syncInternal(zkRootNodePath); } private void createRootDir(final String rootPath) throws Exception { @@ -915,6 +933,7 @@ public class ZKRMStateStore extends RMStateStore { // call listener to reconnect LOG.info("ZKRMStateStore Session expired"); createConnection(); + syncInternal(event.getPath()); break; default: LOG.error("Unexpected Zookeeper" + @@ -931,6 +950,27 @@ public class ZKRMStateStore extends RMStateStore { return (root + "/" + nodeName); } + /** + * Helper method to call ZK's sync() after calling createConnection(). + * Note that sync path is meaningless for now: + * http://mail-archives.apache.org/mod_mbox/zookeeper-user/201102.mbox/browser + * @param path path to sync, nullable value. If the path is null, + * zkRootNodePath is used to sync. + * @return true if ZK.sync() succeededs, false if ZK.sync() fails. + * @throws InterruptedException + */ + private boolean syncInternal(String path) throws InterruptedException { + ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); + if (path != null) { + zkClient.sync(path, cb, null); + } else { + zkClient.sync(zkRootNodePath, cb, null); + } + boolean succeededToSync = cb.latch.await( + zkSessionTimeout, TimeUnit.MILLISECONDS); + return succeededToSync; + } + /** * Helper method that creates fencing node, executes the passed operations, * and deletes the fencing node. @@ -1091,6 +1131,18 @@ public class ZKRMStateStore extends RMStateStore { switch (code) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + + private boolean shouldRetryWithNewConnection(Code code) { + // For fast recovery, we choose to close current connection after + // SESSIONMOVED occurs. Latest state of a zknode path is ensured by + // following zk.sync(path) operation. + switch (code) { case SESSIONEXPIRED: case SESSIONMOVED: return true; @@ -1118,12 +1170,34 @@ public class ZKRMStateStore extends RMStateStore { return null; } LOG.info("Exception while executing a ZK operation.", ke); - if (shouldRetry(ke.code()) && ++retry < numRetries) { + retry++; + if (shouldRetry(ke.code()) && retry < numRetries) { LOG.info("Retrying operation on ZK. Retry no. " + retry); Thread.sleep(zkRetryInterval); - createConnection(); continue; } + if (shouldRetryWithNewConnection(ke.code()) && retry < numRetries) { + LOG.info("Retrying operation on ZK with new Connection. " + + "Retry no. " + retry); + Thread.sleep(zkRetryInterval); + createConnection(); + boolean succeededToSync = false; + try { + succeededToSync = syncInternal(ke.getPath()); + } catch (InterruptedException ie) { + LOG.info("Interrupted sync operation. Giving up!"); + Thread.currentThread().interrupt(); + throw ke; + } + if (succeededToSync) { + // continue the operation. + continue; + } else { + // Giving up since new connection without sync can occur an + // unexpected view from the client like YARN-3798. + LOG.info("Failed to sync with ZK new connection."); + } + } LOG.info("Maxed out ZK retries. Giving up!"); throw ke; }