YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)

(cherry picked from commit 8d88691d16)
This commit is contained in:
Karthik Kambatla 2015-03-04 19:47:02 -08:00
parent f9a2007aff
commit 0d62e94887
4 changed files with 70 additions and 30 deletions

View File

@ -90,6 +90,14 @@ protected static class CountdownWatcher implements Watcher {
// XXX this doesn't need to be volatile! (Should probably be final)
volatile CountDownLatch clientConnected;
volatile boolean connected;
protected ZooKeeper client;
public void initializeWatchedClient(ZooKeeper zk) {
if (client != null) {
throw new RuntimeException("Watched Client was already set");
}
client = zk;
}
public CountdownWatcher() {
reset();
@ -191,8 +199,7 @@ protected TestableZooKeeper createClient(CountdownWatcher watcher,
zk.close();
}
}
watcher.initializeWatchedClient(zk);
return zk;
}

View File

@ -659,6 +659,9 @@ Release 2.7.0 - UNRELEASED
YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending
jobs. (Siqi Li via kasha)
YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving
events for old client. (Zhihai Xu via kasha)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -153,7 +153,13 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
protected ZooKeeper zkClient;
private ZooKeeper oldZkClient;
/* activeZkClient is not used to do actual operations,
* it is only used to verify client session for watched events and
* it gets activated into zkClient on connection event.
*/
@VisibleForTesting
ZooKeeper activeZkClient;
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@ -355,21 +361,14 @@ public Void run() throws KeeperException, InterruptedException {
}
private synchronized void closeZkClients() throws IOException {
if (zkClient != null) {
zkClient = null;
if (activeZkClient != null) {
try {
zkClient.close();
activeZkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing ZK", e);
}
zkClient = null;
}
if (oldZkClient != null) {
try {
oldZkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing old ZK", e);
}
oldZkClient = null;
activeZkClient = null;
}
}
@ -830,11 +829,16 @@ public synchronized void deleteStore() throws Exception {
* hides the ZK methods of the store from its public interface
*/
private final class ForwardingWatcher implements Watcher {
private ZooKeeper watchedZkClient;
public ForwardingWatcher(ZooKeeper client) {
this.watchedZkClient = client;
}
@Override
public void process(WatchedEvent event) {
try {
ZKRMStateStore.this.processWatchEvent(event);
ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t));
@ -845,8 +849,16 @@ public void process(WatchedEvent event) {
@VisibleForTesting
@Private
@Unstable
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
public synchronized void processWatchEvent(ZooKeeper zk,
WatchedEvent event) throws Exception {
// only process watcher event from current ZooKeeper Client session.
if (zk != activeZkClient) {
LOG.info("Ignore watcher event type: " + event.getType() +
" with state:" + event.getState() + " for path:" +
event.getPath() + " from old session");
return;
}
Event.EventType eventType = event.getType();
LOG.info("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + " for " + this);
@ -857,17 +869,15 @@ public synchronized void processWatchEvent(WatchedEvent event)
switch (event.getState()) {
case SyncConnected:
LOG.info("ZKRMStateStore Session connected");
if (oldZkClient != null) {
if (zkClient == null) {
// the SyncConnected must be from the client that sent Disconnected
zkClient = oldZkClient;
oldZkClient = null;
zkClient = activeZkClient;
ZKRMStateStore.this.notifyAll();
LOG.info("ZKRMStateStore Session restored");
}
break;
case Disconnected:
LOG.info("ZKRMStateStore Session disconnected");
oldZkClient = zkClient;
zkClient = null;
break;
case Expired:
@ -1100,7 +1110,8 @@ private synchronized void createConnection()
for (int retries = 0; retries < numRetries && zkClient == null;
retries++) {
try {
zkClient = getNewZooKeeper();
activeZkClient = getNewZooKeeper();
zkClient = activeZkClient;
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
}
@ -1130,7 +1141,7 @@ private synchronized void createConnection()
protected synchronized ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
zk.register(new ForwardingWatcher());
zk.register(new ForwardingWatcher(zk));
return zk;
}

View File

@ -71,6 +71,7 @@ class TestZKClient {
ZKRMStateStore store;
boolean forExpire = false;
TestForwardingWatcher oldWatcher;
TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2);
@ -86,35 +87,36 @@ public TestZKRMStateStore(Configuration conf, String workingZnode)
@Override
public ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
oldWatcher = watcher;
watcher = new TestForwardingWatcher();
return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
}
@Override
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
public synchronized void processWatchEvent(ZooKeeper zk,
WatchedEvent event) throws Exception {
if (forExpire) {
// a hack... couldn't find a way to trigger expired event.
WatchedEvent expriredEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null);
super.processWatchEvent(expriredEvent);
super.processWatchEvent(zk, expriredEvent);
forExpire = false;
syncBarrier.await();
} else {
super.processWatchEvent(event);
super.processWatchEvent(zk, event);
}
}
}
private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher {
public void process(WatchedEvent event) {
super.process(event);
try {
if (store != null) {
store.processWatchEvent(event);
store.processWatchEvent(client, event);
}
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
@ -127,7 +129,6 @@ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
watcher = new TestForwardingWatcher();
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
}
@ -239,6 +240,24 @@ public void testZKSessionTimeout() throws Exception {
LOG.error(error, e);
fail(error);
}
// send Disconnected event from old client session to ZKRMStateStore
// check the current client session is not affected.
Assert.assertTrue(zkClientTester.oldWatcher != null);
WatchedEvent disconnectedEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Disconnected, null);
zkClientTester.oldWatcher.process(disconnectedEvent);
Assert.assertTrue(store.zkClient != null);
zkClientTester.watcher.process(disconnectedEvent);
Assert.assertTrue(store.zkClient == null);
WatchedEvent connectedEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null);
zkClientTester.watcher.process(connectedEvent);
Assert.assertTrue(store.zkClient != null);
Assert.assertTrue(store.zkClient == store.activeZkClient);
}
@Test(timeout = 20000)