From d127dd64a744f489985315c1dfa9de1e53a9b64a Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 6 May 2015 17:51:17 -0700 Subject: [PATCH] YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based state-store to avoid crashing on duplicate deletes. Contributed by Zhihai Xu. (cherry picked from commit 4c7b9b6abe2452c9752a11214762be2e7665fb32) --- hadoop-yarn-project/CHANGES.txt | 3 + .../recovery/ZKRMStateStore.java | 64 +++++++++++++++---- .../recovery/TestZKRMStateStore.java | 35 ++++++++++ 3 files changed, 89 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c11729a8c1b..63d5c1a4c9d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -331,6 +331,9 @@ Release 2.7.1 - UNRELEASED YARN-3301. Fixed the format issue of the new RM attempt web page. (Xuan Gong via jianhe) + YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based + state-store to avoid crashing on duplicate deletes. (Zhihai Xu via vinodkv) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 97dd02990d8..364c970c52d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -694,7 +694,7 @@ public class ZKRMStateStore extends RMStateStore { LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath + " and its attempts."); } - doMultiWithRetries(opList); + doDeleteMultiWithRetries(opList); } @Override @@ -703,13 +703,12 @@ public class ZKRMStateStore extends RMStateStore { throws Exception { ArrayList opList = new ArrayList(); addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false); - doMultiWithRetries(opList); + doStoreMultiWithRetries(opList); } @Override protected synchronized void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { - ArrayList opList = new ArrayList(); String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); @@ -718,11 +717,12 @@ public class ZKRMStateStore extends RMStateStore { + rmDTIdentifier.getSequenceNumber()); } if (existsWithRetries(nodeRemovePath, false) != null) { + ArrayList opList = new ArrayList(); opList.add(Op.delete(nodeRemovePath, -1)); + doDeleteMultiWithRetries(opList); } else { LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); } - doMultiWithRetries(opList); } @Override @@ -741,7 +741,7 @@ public class ZKRMStateStore extends RMStateStore { // in case znode exists addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true); } - doMultiWithRetries(opList); + doStoreMultiWithRetries(opList); } private void addStoreOrUpdateOps(ArrayList opList, @@ -810,7 +810,7 @@ public class ZKRMStateStore extends RMStateStore { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } if (existsWithRetries(nodeRemovePath, false) != null) { - doMultiWithRetries(Op.delete(nodeRemovePath, -1)); + doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1)); } else { LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); } @@ -914,7 +914,7 @@ public class ZKRMStateStore extends RMStateStore { * Helper method that creates fencing node, executes the passed operations, * and deletes the fencing node. */ - private synchronized void doMultiWithRetries( + private synchronized void doStoreMultiWithRetries( final List opList) throws Exception { final List execOpList = new ArrayList(opList.size() + 2); execOpList.add(createFencingNodePathOp); @@ -933,8 +933,32 @@ public class ZKRMStateStore extends RMStateStore { * Helper method that creates fencing node, executes the passed operation, * and deletes the fencing node. */ - private void doMultiWithRetries(final Op op) throws Exception { - doMultiWithRetries(Collections.singletonList(op)); + private void doStoreMultiWithRetries(final Op op) throws Exception { + doStoreMultiWithRetries(Collections.singletonList(op)); + } + + /** + * Helper method that creates fencing node, executes the passed + * delete related operations and deletes the fencing node. + */ + private synchronized void doDeleteMultiWithRetries( + final List opList) throws Exception { + final List execOpList = new ArrayList(opList.size() + 2); + execOpList.add(createFencingNodePathOp); + execOpList.addAll(opList); + execOpList.add(deleteFencingNodePathOp); + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + setHasDeleteNodeOp(true); + zkClient.multi(execOpList); + return null; + } + }.runWithRetries(); + } + + private void doDeleteMultiWithRetries(final Op op) throws Exception { + doDeleteMultiWithRetries(Collections.singletonList(op)); } @VisibleForTesting @@ -943,7 +967,7 @@ public class ZKRMStateStore extends RMStateStore { public void createWithRetries( final String path, final byte[] data, final List acl, final CreateMode mode) throws Exception { - doMultiWithRetries(Op.create(path, data, acl, mode)); + doStoreMultiWithRetries(Op.create(path, data, acl, mode)); } @VisibleForTesting @@ -951,7 +975,7 @@ public class ZKRMStateStore extends RMStateStore { @Unstable public void setDataWithRetries(final String path, final byte[] data, final int version) throws Exception { - doMultiWithRetries(Op.setData(path, data, version)); + doStoreMultiWithRetries(Op.setData(path, data, version)); } @VisibleForTesting @@ -1017,7 +1041,12 @@ public class ZKRMStateStore extends RMStateStore { for (String child : children) { recursiveDeleteWithRetriesHelper(path + "/" + child, false); } - zkClient.delete(path, -1); + + try { + zkClient.delete(path, -1); + } catch (KeeperException.NoNodeException nne) { + LOG.info("Node " + path + " doesn't exist to delete"); + } } /** @@ -1037,7 +1066,7 @@ public class ZKRMStateStore extends RMStateStore { if(isFencedState()) { break; } - doMultiWithRetries(emptyOpList); + doStoreMultiWithRetries(emptyOpList); Thread.sleep(zkSessionTimeout); } } catch (InterruptedException ie) { @@ -1050,6 +1079,10 @@ public class ZKRMStateStore extends RMStateStore { } private abstract class ZKAction { + private boolean hasDeleteNodeOp = false; + void setHasDeleteNodeOp(boolean hasDeleteOp) { + this.hasDeleteNodeOp = hasDeleteOp; + } // run() expects synchronization on ZKRMStateStore.this abstract T run() throws KeeperException, InterruptedException; @@ -1099,6 +1132,11 @@ public class ZKRMStateStore extends RMStateStore { LOG.info("znode already exists!"); return null; } + if (hasDeleteNodeOp && ke.code() == Code.NONODE) { + LOG.info("znode has already been deleted!"); + return null; + } + LOG.info("Exception while executing a ZK operation.", ke); if (shouldRetry(ke.code()) && ++retry < numRetries) { LOG.info("Retrying operation on ZK. Retry no. " + retry); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index c632a061e3f..333455cdbec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.HashMap; import java.util.List; import javax.crypto.SecretKey; @@ -38,6 +39,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -58,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Assert; @@ -381,4 +384,36 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { store.close(); } + + @Test + public void testDuplicateRMAppDeletion() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + RMStateStore store = zkTester.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + ApplicationAttemptId attemptIdRemoved = ConverterUtils + .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); + ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); + storeApp(store, appIdRemoved, submitTime, startTime); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", null, null, dispatcher); + + ApplicationSubmissionContext context = + new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(appIdRemoved); + ApplicationStateData appStateRemoved = + ApplicationStateData.newInstance( + submitTime, startTime, context, "user1"); + appStateRemoved.attempts.put(attemptIdRemoved, null); + store.removeApplicationStateInternal(appStateRemoved); + try { + store.removeApplicationStateInternal(appStateRemoved); + } catch (KeeperException.NoNodeException nne) { + Assert.fail("NoNodeException should not happen."); + } + store.close(); + } }