diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 9e4eec24c84..8a7fa4edb30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -405,7 +405,7 @@ public class ZKRMStateStore extends RMStateStore { } @Override - protected synchronized void closeInternal() throws Exception { + protected void closeInternal() throws Exception { if (verifyActiveStatusThread != null) { verifyActiveStatusThread.interrupt(); verifyActiveStatusThread.join(1000); @@ -963,8 +963,9 @@ public class ZKRMStateStore extends RMStateStore { * Helper method that creates fencing node, executes the passed operations, * and deletes the fencing node. */ - private synchronized void doStoreMultiWithRetries( - final List opList) throws Exception { + @VisibleForTesting + synchronized void doStoreMultiWithRetries(final List opList) + throws Exception { final List execOpList = new ArrayList(opList.size() + 2); execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index ea66c145bc5..f17bccc3f6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -65,14 +65,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Op; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertFalse; public class TestZKRMStateStore extends RMStateStoreTestBase { @@ -80,7 +83,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { private static final int ZK_TIMEOUT_MS = 1000; class TestZKRMStateStoreTester implements RMStateStoreHelper { - ZooKeeper client; TestZKRMStateStoreInternal store; String workingZnode = "/jira/issue/3077/rmstore"; @@ -91,7 +93,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { throws Exception { init(conf); start(); - assertTrue(znodeWorkingPath.equals(workingZnode)); + assertEquals(workingZnode, znodeWorkingPath); } @Override @@ -350,6 +352,101 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); } + @Test + public void testTransitionWithUnreachableZK() throws Exception { + final AtomicBoolean zkUnreachable = new AtomicBoolean(false); + final AtomicBoolean threadHung = new AtomicBoolean(false); + final Object hangLock = new Object(); + final Configuration conf = createHARMConf("rm1,rm2", "rm1", 1234); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + + // Create a state store that can simulate losing contact with the ZK node + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() { + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration storeConf = new YarnConfiguration(conf); + storeConf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + storeConf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, + workingZnode); + storeConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 500); + this.client = createClient(); + this.store = new TestZKRMStateStoreInternal(storeConf, workingZnode) { + @Override + synchronized void doStoreMultiWithRetries(final List opList) + throws Exception { + if (zkUnreachable.get()) { + // Let the test know that it can now proceed + threadHung.set(true); + + synchronized (hangLock) { + hangLock.notify(); + } + + // Take a long nap while holding the lock to simulate the ZK node + // being unreachable. This behavior models what happens in + // super.doStoreMultiWithRetries() when the ZK node it unreachble. + // If that behavior changes, then this test should also change or + // be phased out. + Thread.sleep(60000); + } else { + // Business as usual + super.doStoreMultiWithRetries(opList); + } + } + }; + return this.store; + } + }; + + // Start with a single RM in HA mode + final RMStateStore store = zkTester.getRMStateStore(); + final MockRM rm = new MockRM(conf, store); + rm.start(); + + // Make the RM active + final StateChangeRequestInfo req = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + rm.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm.getRMContext().getRMAdminService().getServiceStatus().getState()); + + // Simulate the ZK node going dark and wait for the + // VerifyActiveStatusThread to hang + zkUnreachable.set(true); + + synchronized (hangLock) { + while (!threadHung.get()) { + hangLock.wait(); + } + } + + assertTrue("Unable to perform test because Verify Active Status Thread " + + "did not run", threadHung.get()); + + // Try to transition the RM to standby. Give up after 1000ms. + Thread standby = new Thread(new Runnable() { + @Override + public void run() { + try { + rm.getRMContext().getRMAdminService().transitionToStandby(req); + } catch (IOException ex) { + // OK to exit + } + } + }, "Test Unreachable ZK Thread"); + + standby.start(); + standby.join(1000); + + assertFalse("The thread initiating the transition to standby is hung", + standby.isAlive()); + zkUnreachable.set(false); + } + @Test public void testNoAuthExceptionInNonHAMode() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();