From 08c02199ffd212eedd66ff990092bbad36ba2378 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Mon, 19 Oct 2015 10:30:28 +0900 Subject: [PATCH] YARN-3798. ZKRMStateStore shouldn't create new session without occurrance of SESSIONEXPIED. (ozawa and Varun Saxena) --- hadoop-yarn-project/CHANGES.txt | 3 + .../recovery/ZKRMStateStore.java | 80 ++++++++++++++++++- 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c1724c80e16..8bbc736ec69 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -109,6 +109,9 @@ Release 2.7.2 - UNRELEASED YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id has not been reset synchronously. (Jun Gong via rohithsharmaks) + YARN-3798. ZKRMStateStore shouldn't create new session without occurrance of + SESSIONEXPIED. (ozawa and Varun Saxena) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 82ac2c13a49..3eeb895256c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -31,6 +31,8 @@ import java.util.Collections; import java.util.List; import com.google.common.base.Preconditions; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -112,6 +115,20 @@ public class ZKRMStateStore extends RMStateStore { private List zkAcl; private List zkAuths; + class ZKSyncOperationCallback implements AsyncCallback.VoidCallback { + public final CountDownLatch latch = new CountDownLatch(1); + @Override + public void processResult(int rc, String path, Object ctx){ + if (rc == Code.OK.intValue()) { + LOG.info("ZooKeeper sync operation succeeded. path: " + path); + latch.countDown(); + } else { + LOG.fatal("ZooKeeper sync operation failed. Waiting for session " + + "timeout. path: " + path); + } + } + } + /** * * ROOT_DIR_PATH @@ -225,6 +242,7 @@ public class ZKRMStateStore extends RMStateStore { znodeWorkingPath = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); + zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); @@ -298,6 +316,8 @@ public class ZKRMStateStore extends RMStateStore { createRootDir(delegationTokensRootPath); createRootDir(dtSequenceNumberPath); createRootDir(amrmTokenSecretManagerRoot); + + syncInternal(zkRootNodePath); } private void createRootDir(final String rootPath) throws Exception { @@ -888,6 +908,7 @@ public class ZKRMStateStore extends RMStateStore { // call listener to reconnect LOG.info("ZKRMStateStore Session expired"); createConnection(); + syncInternal(event.getPath()); break; default: LOG.error("Unexpected Zookeeper" + @@ -904,6 +925,27 @@ public class ZKRMStateStore extends RMStateStore { return (root + "/" + nodeName); } + /** + * Helper method to call ZK's sync() after calling createConnection(). + * Note that sync path is meaningless for now: + * http://mail-archives.apache.org/mod_mbox/zookeeper-user/201102.mbox/browser + * @param path path to sync, nullable value. If the path is null, + * zkRootNodePath is used to sync. + * @return true if ZK.sync() succeededs, false if ZK.sync() fails. + * @throws InterruptedException + */ + private boolean syncInternal(String path) throws InterruptedException { + ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); + if (path != null) { + zkClient.sync(path, cb, null); + } else { + zkClient.sync(zkRootNodePath, cb, null); + } + boolean succeededToSync = cb.latch.await( + zkSessionTimeout, TimeUnit.MILLISECONDS); + return succeededToSync; + } + /** * Helper method that creates fencing node, executes the passed operations, * and deletes the fencing node. @@ -1100,6 +1142,18 @@ public class ZKRMStateStore extends RMStateStore { switch (code) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + + private boolean shouldRetryWithNewConnection(Code code) { + // For fast recover, we choose to close current connection after + // SESSIONMOVED occurs. Latest state of a path is assured by a following + // zk.sync(path) operation. + switch (code) { case SESSIONEXPIRED: case SESSIONMOVED: return true; @@ -1132,12 +1186,34 @@ public class ZKRMStateStore extends RMStateStore { } LOG.info("Exception while executing a ZK operation.", ke); - if (shouldRetry(ke.code()) && ++retry < numRetries) { + retry++; + if (shouldRetry(ke.code()) && retry < numRetries) { LOG.info("Retrying operation on ZK. Retry no. " + retry); Thread.sleep(zkRetryInterval); - createConnection(); continue; } + if (shouldRetryWithNewConnection(ke.code()) && retry < numRetries) { + LOG.info("Retrying operation on ZK with new Connection. " + + "Retry no. " + retry); + Thread.sleep(zkRetryInterval); + createConnection(); + boolean succeededToSync = false; + try { + succeededToSync = syncInternal(ke.getPath()); + } catch (InterruptedException ie) { + LOG.info("Interrupted sync operation. Giving up!"); + Thread.currentThread().interrupt(); + throw ke; + } + if (succeededToSync) { + // continue the operation. + continue; + } else { + // Giving up since new connection without sync can occur an + // unexpected view from the client like YARN-3798. + LOG.info("Failed to sync with ZK new connection."); + } + } LOG.info("Maxed out ZK retries. Giving up!"); throw ke; }