diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java index 7d0727abd15..5f031337b75 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java @@ -90,6 +90,14 @@ public abstract class ClientBaseWithFixes extends ZKTestCase { // XXX this doesn't need to be volatile! (Should probably be final) volatile CountDownLatch clientConnected; volatile boolean connected; + protected ZooKeeper client; + + public void initializeWatchedClient(ZooKeeper zk) { + if (client != null) { + throw new RuntimeException("Watched Client was already set"); + } + client = zk; + } public CountdownWatcher() { reset(); @@ -191,8 +199,7 @@ public abstract class ClientBaseWithFixes extends ZKTestCase { zk.close(); } } - - + watcher.initializeWatchedClient(zk); return zk; } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e648e3011b0..8313b48e495 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -659,6 +659,9 @@ Release 2.7.0 - UNRELEASED YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha) + YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving + events for old client. (Zhihai Xu via kasha) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 591a5511785..614ef15e43e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -153,7 +153,13 @@ public class ZKRMStateStore extends RMStateStore { @VisibleForTesting protected ZooKeeper zkClient; - private ZooKeeper oldZkClient; + + /* activeZkClient is not used to do actual operations, + * it is only used to verify client session for watched events and + * it gets activated into zkClient on connection event. + */ + @VisibleForTesting + ZooKeeper activeZkClient; /** Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; @@ -355,21 +361,14 @@ public class ZKRMStateStore extends RMStateStore { } private synchronized void closeZkClients() throws IOException { - if (zkClient != null) { + zkClient = null; + if (activeZkClient != null) { try { - zkClient.close(); + activeZkClient.close(); } catch (InterruptedException e) { throw new IOException("Interrupted while closing ZK", e); } - zkClient = null; - } - if (oldZkClient != null) { - try { - oldZkClient.close(); - } catch (InterruptedException e) { - throw new IOException("Interrupted while closing old ZK", e); - } - oldZkClient = null; + activeZkClient = null; } } @@ -830,11 +829,16 @@ public class ZKRMStateStore extends RMStateStore { * hides the ZK methods of the store from its public interface */ private final class ForwardingWatcher implements Watcher { + private ZooKeeper watchedZkClient; + + public ForwardingWatcher(ZooKeeper client) { + this.watchedZkClient = client; + } @Override public void process(WatchedEvent event) { try { - ZKRMStateStore.this.processWatchEvent(event); + ZKRMStateStore.this.processWatchEvent(watchedZkClient, event); } catch (Throwable t) { LOG.error("Failed to process watcher event " + event + ": " + StringUtils.stringifyException(t)); @@ -845,8 +849,16 @@ public class ZKRMStateStore extends RMStateStore { @VisibleForTesting @Private @Unstable - public synchronized void processWatchEvent(WatchedEvent event) - throws Exception { + public synchronized void processWatchEvent(ZooKeeper zk, + WatchedEvent event) throws Exception { + // only process watcher event from current ZooKeeper Client session. + if (zk != activeZkClient) { + LOG.info("Ignore watcher event type: " + event.getType() + + " with state:" + event.getState() + " for path:" + + event.getPath() + " from old session"); + return; + } + Event.EventType eventType = event.getType(); LOG.info("Watcher event type: " + eventType + " with state:" + event.getState() + " for path:" + event.getPath() + " for " + this); @@ -857,17 +869,15 @@ public class ZKRMStateStore extends RMStateStore { switch (event.getState()) { case SyncConnected: LOG.info("ZKRMStateStore Session connected"); - if (oldZkClient != null) { + if (zkClient == null) { // the SyncConnected must be from the client that sent Disconnected - zkClient = oldZkClient; - oldZkClient = null; + zkClient = activeZkClient; ZKRMStateStore.this.notifyAll(); LOG.info("ZKRMStateStore Session restored"); } break; case Disconnected: LOG.info("ZKRMStateStore Session disconnected"); - oldZkClient = zkClient; zkClient = null; break; case Expired: @@ -1100,7 +1110,8 @@ public class ZKRMStateStore extends RMStateStore { for (int retries = 0; retries < numRetries && zkClient == null; retries++) { try { - zkClient = getNewZooKeeper(); + activeZkClient = getNewZooKeeper(); + zkClient = activeZkClient; for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); } @@ -1130,7 +1141,7 @@ public class ZKRMStateStore extends RMStateStore { protected synchronized ZooKeeper getNewZooKeeper() throws IOException, InterruptedException { ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); - zk.register(new ForwardingWatcher()); + zk.register(new ForwardingWatcher(zk)); return zk; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 8dc3628e2a6..62dc5efd394 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -71,6 +71,7 @@ public class TestZKRMStateStoreZKClientConnections extends ZKRMStateStore store; boolean forExpire = false; + TestForwardingWatcher oldWatcher; TestForwardingWatcher watcher; CyclicBarrier syncBarrier = new CyclicBarrier(2); @@ -86,35 +87,36 @@ public class TestZKRMStateStoreZKClientConnections extends @Override public ZooKeeper getNewZooKeeper() throws IOException, InterruptedException { + oldWatcher = watcher; + watcher = new TestForwardingWatcher(); return createClient(watcher, hostPort, ZK_TIMEOUT_MS); } @Override - public synchronized void processWatchEvent(WatchedEvent event) - throws Exception { + public synchronized void processWatchEvent(ZooKeeper zk, + WatchedEvent event) throws Exception { if (forExpire) { // a hack... couldn't find a way to trigger expired event. WatchedEvent expriredEvent = new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null); - super.processWatchEvent(expriredEvent); + super.processWatchEvent(zk, expriredEvent); forExpire = false; syncBarrier.await(); } else { - super.processWatchEvent(event); + super.processWatchEvent(zk, event); } } } private class TestForwardingWatcher extends ClientBaseWithFixes.CountdownWatcher { - public void process(WatchedEvent event) { super.process(event); try { if (store != null) { - store.processWatchEvent(event); + store.processWatchEvent(client, event); } } catch (Throwable t) { LOG.error("Failed to process watcher event " + event + ": " @@ -127,7 +129,6 @@ public class TestZKRMStateStoreZKClientConnections extends String workingZnode = "/Test"; conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - watcher = new TestForwardingWatcher(); this.store = new TestZKRMStateStore(conf, workingZnode); return this.store; } @@ -239,6 +240,24 @@ public class TestZKRMStateStoreZKClientConnections extends LOG.error(error, e); fail(error); } + + // send Disconnected event from old client session to ZKRMStateStore + // check the current client session is not affected. + Assert.assertTrue(zkClientTester.oldWatcher != null); + WatchedEvent disconnectedEvent = new WatchedEvent( + Watcher.Event.EventType.None, + Watcher.Event.KeeperState.Disconnected, null); + zkClientTester.oldWatcher.process(disconnectedEvent); + Assert.assertTrue(store.zkClient != null); + + zkClientTester.watcher.process(disconnectedEvent); + Assert.assertTrue(store.zkClient == null); + WatchedEvent connectedEvent = new WatchedEvent( + Watcher.Event.EventType.None, + Watcher.Event.KeeperState.SyncConnected, null); + zkClientTester.watcher.process(connectedEvent); + Assert.assertTrue(store.zkClient != null); + Assert.assertTrue(store.zkClient == store.activeZkClient); } @Test(timeout = 20000)