YARN-4348. ZKRMStateStore.syncInternal shouldn't wait for sync completion for avoiding blocking ZK's event thread. (ozawa)

(cherry picked from commit 0460b8a8a3)
This commit is contained in:
Tsuyoshi Ozawa 2015-12-08 13:31:23 +09:00
parent 203c7a648b
commit b345ffd7df
2 changed files with 18 additions and 27 deletions

View File

@ -51,6 +51,9 @@ Release 2.6.3 - UNRELEASED
YARN-4365. FileSystemNodeLabelStore should check for root dir existence on
startup (Kuhu Shukla via jlowe)
YARN-4348. ZKRMStateStore.syncInternal shouldn't wait for sync completion for
avoiding blocking ZK's event thread. (ozawa)
Release 2.6.2 - 2015-10-28
INCOMPATIBLE CHANGES

View File

@ -114,12 +114,10 @@ public class ZKRMStateStore extends RMStateStore {
private List<ZKUtil.ZKAuthInfo> zkAuths;
class ZKSyncOperationCallback implements AsyncCallback.VoidCallback {
public final CountDownLatch latch = new CountDownLatch(1);
@Override
public void processResult(int rc, String path, Object ctx){
if (rc == Code.OK.intValue()) {
LOG.info("ZooKeeper sync operation succeeded. path: " + path);
latch.countDown();
} else {
LOG.fatal("ZooKeeper sync operation failed. Waiting for session " +
"timeout. path: " + path);
@ -959,16 +957,20 @@ public class ZKRMStateStore extends RMStateStore {
* @return true if ZK.sync() succeededs, false if ZK.sync() fails.
* @throws InterruptedException
*/
private boolean syncInternal(String path) throws InterruptedException {
ZKSyncOperationCallback cb = new ZKSyncOperationCallback();
if (path != null) {
zkClient.sync(path, cb, null);
} else {
zkClient.sync(zkRootNodePath, cb, null);
private void syncInternal(final String path) throws InterruptedException {
final ZKSyncOperationCallback cb = new ZKSyncOperationCallback();
final String pathForSync = (path != null) ? path : zkRootNodePath;
try {
new ZKAction<Void>() {
@Override
Void run() throws KeeperException, InterruptedException {
zkClient.sync(pathForSync, cb, null);
return null;
}
}.runWithRetries();
} catch (Exception e) {
LOG.fatal("sync failed.");
}
boolean succeededToSync = cb.latch.await(
zkSessionTimeout, TimeUnit.MILLISECONDS);
return succeededToSync;
}
/**
@ -1181,22 +1183,8 @@ public class ZKRMStateStore extends RMStateStore {
"Retry no. " + retry);
Thread.sleep(zkRetryInterval);
createConnection();
boolean succeededToSync = false;
try {
succeededToSync = syncInternal(ke.getPath());
} catch (InterruptedException ie) {
LOG.info("Interrupted sync operation. Giving up!");
Thread.currentThread().interrupt();
throw ke;
}
if (succeededToSync) {
// continue the operation.
syncInternal(ke.getPath());
continue;
} else {
// Giving up since new connection without sync can occur an
// unexpected view from the client like YARN-3798.
LOG.info("Failed to sync with ZK new connection.");
}
}
LOG.info("Maxed out ZK retries. Giving up!");
throw ke;