YARN-5694. ZKRMStateStore can prevent the transition to standby if the ZK node is unreachable. Contributed by Daniel Templeton
This commit is contained in:
parent
6d8df4e81e
commit
c95ab6b895
|
@ -405,7 +405,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void closeInternal() throws Exception {
|
||||
protected void closeInternal() throws Exception {
|
||||
if (verifyActiveStatusThread != null) {
|
||||
verifyActiveStatusThread.interrupt();
|
||||
verifyActiveStatusThread.join(1000);
|
||||
|
@ -963,8 +963,9 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
* Helper method that creates fencing node, executes the passed operations,
|
||||
* and deletes the fencing node.
|
||||
*/
|
||||
private synchronized void doStoreMultiWithRetries(
|
||||
final List<Op> opList) throws Exception {
|
||||
@VisibleForTesting
|
||||
synchronized void doStoreMultiWithRetries(final List<Op> opList)
|
||||
throws Exception {
|
||||
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
|
||||
execOpList.add(createFencingNodePathOp);
|
||||
execOpList.addAll(opList);
|
||||
|
|
|
@ -65,14 +65,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.ZooDefs.Perms;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.Op;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||
|
||||
|
@ -80,7 +83,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
private static final int ZK_TIMEOUT_MS = 1000;
|
||||
|
||||
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
||||
|
||||
ZooKeeper client;
|
||||
TestZKRMStateStoreInternal store;
|
||||
String workingZnode = "/jira/issue/3077/rmstore";
|
||||
|
@ -91,7 +93,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
throws Exception {
|
||||
init(conf);
|
||||
start();
|
||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||
assertEquals(workingZnode, znodeWorkingPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -350,6 +352,101 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransitionWithUnreachableZK() throws Exception {
|
||||
final AtomicBoolean zkUnreachable = new AtomicBoolean(false);
|
||||
final AtomicBoolean threadHung = new AtomicBoolean(false);
|
||||
final Object hangLock = new Object();
|
||||
final Configuration conf = createHARMConf("rm1,rm2", "rm1", 1234);
|
||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
|
||||
// Create a state store that can simulate losing contact with the ZK node
|
||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {
|
||||
@Override
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
YarnConfiguration storeConf = new YarnConfiguration(conf);
|
||||
storeConf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||
storeConf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
|
||||
workingZnode);
|
||||
storeConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 500);
|
||||
this.client = createClient();
|
||||
this.store = new TestZKRMStateStoreInternal(storeConf, workingZnode) {
|
||||
@Override
|
||||
synchronized void doStoreMultiWithRetries(final List<Op> opList)
|
||||
throws Exception {
|
||||
if (zkUnreachable.get()) {
|
||||
// Let the test know that it can now proceed
|
||||
threadHung.set(true);
|
||||
|
||||
synchronized (hangLock) {
|
||||
hangLock.notify();
|
||||
}
|
||||
|
||||
// Take a long nap while holding the lock to simulate the ZK node
|
||||
// being unreachable. This behavior models what happens in
|
||||
// super.doStoreMultiWithRetries() when the ZK node it unreachble.
|
||||
// If that behavior changes, then this test should also change or
|
||||
// be phased out.
|
||||
Thread.sleep(60000);
|
||||
} else {
|
||||
// Business as usual
|
||||
super.doStoreMultiWithRetries(opList);
|
||||
}
|
||||
}
|
||||
};
|
||||
return this.store;
|
||||
}
|
||||
};
|
||||
|
||||
// Start with a single RM in HA mode
|
||||
final RMStateStore store = zkTester.getRMStateStore();
|
||||
final MockRM rm = new MockRM(conf, store);
|
||||
rm.start();
|
||||
|
||||
// Make the RM active
|
||||
final StateChangeRequestInfo req = new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
|
||||
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
||||
assertEquals("RM with ZKStore didn't start",
|
||||
Service.STATE.STARTED, rm.getServiceState());
|
||||
assertEquals("RM should be Active",
|
||||
HAServiceProtocol.HAServiceState.ACTIVE,
|
||||
rm.getRMContext().getRMAdminService().getServiceStatus().getState());
|
||||
|
||||
// Simulate the ZK node going dark and wait for the
|
||||
// VerifyActiveStatusThread to hang
|
||||
zkUnreachable.set(true);
|
||||
|
||||
synchronized (hangLock) {
|
||||
while (!threadHung.get()) {
|
||||
hangLock.wait();
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("Unable to perform test because Verify Active Status Thread "
|
||||
+ "did not run", threadHung.get());
|
||||
|
||||
// Try to transition the RM to standby. Give up after 1000ms.
|
||||
Thread standby = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
rm.getRMContext().getRMAdminService().transitionToStandby(req);
|
||||
} catch (IOException ex) {
|
||||
// OK to exit
|
||||
}
|
||||
}
|
||||
}, "Test Unreachable ZK Thread");
|
||||
|
||||
standby.start();
|
||||
standby.join(1000);
|
||||
|
||||
assertFalse("The thread initiating the transition to standby is hung",
|
||||
standby.isAlive());
|
||||
zkUnreachable.set(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoAuthExceptionInNonHAMode() throws Exception {
|
||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||
|
|
Loading…
Reference in New Issue