From a0fa1f8386d90862eebf84366a1f80066c092d43 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 25 Oct 2016 13:15:58 -0700 Subject: [PATCH] YARN-5677. RM should transition to standby when connection is lost for an extended period. (Daniel Templeton via kasha) (cherry picked from commit aedd5c4c1b6695d46d7660597685a9a71aaffc64) --- .../EmbeddedElectorService.java | 59 +++++- .../TestRMEmbeddedElector.java | 192 ++++++++++++++++++ 2 files changed, 245 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 72327e82e9c..88d2e102562 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +40,8 @@ import org.apache.zookeeper.data.ACL; import java.io.IOException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -54,6 +57,10 @@ public class EmbeddedElectorService extends AbstractService private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; + private long zkSessionTimeout; + private Timer zkDisconnectTimer; + @VisibleForTesting + final Object zkDisconnectLock = new Object(); EmbeddedElectorService(RMContext rmContext) { super(EmbeddedElectorService.class.getName()); @@ -80,7 +87,7 @@ public class EmbeddedElectorService extends AbstractService YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; - long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, + zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); List zkAcls = RMZKUtils.getZKAcls(conf); @@ -123,6 +130,8 @@ public class EmbeddedElectorService extends AbstractService @Override public void becomeActive() throws ServiceFailedException { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToActive(req); } catch (Exception e) { @@ -132,6 +141,8 @@ public class EmbeddedElectorService extends AbstractService @Override public void becomeStandby() { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToStandby(req); } catch (Exception e) { @@ -139,13 +150,49 @@ public class EmbeddedElectorService extends AbstractService } } + /** + * Stop the disconnect timer. Any running tasks will be allowed to complete. + */ + private void cancelDisconnectTimer() { + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer != null) { + zkDisconnectTimer.cancel(); + zkDisconnectTimer = null; + } + } + } + + /** + * When the ZK client loses contact with ZK, this method will be called to + * allow the RM to react. Because the loss of connection can be noticed + * before the session timeout happens, it is undesirable to transition + * immediately. Instead the method starts a timer that will wait + * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before + * initiating the transition into standby state. + */ @Override public void enterNeutralMode() { - /** - * Possibly due to transient connection issues. Do nothing. - * TODO: Might want to keep track of how long in this state and transition - * to standby. - */ + LOG.warn("Lost contact with Zookeeper. Transitioning to standby in " + + zkSessionTimeout + " ms if connection is not reestablished."); + + // If we've just become disconnected, start a timer. When the time's up, + // we'll transition to standby. + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer == null) { + zkDisconnectTimer = new Timer("Zookeeper disconnect timer"); + zkDisconnectTimer.schedule(new TimerTask() { + @Override + public void run() { + synchronized (zkDisconnectLock) { + // Only run if the timer hasn't been cancelled + if (zkDisconnectTimer != null) { + becomeStandby(); + } + } + } + }, zkSessionTimeout); + } + } } @SuppressWarnings(value = "unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 20b1c0e0060..c6483470015 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -28,6 +28,15 @@ import org.junit.Test; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestRMEmbeddedElector extends ClientBaseWithFixes { private static final Log LOG = @@ -41,6 +50,14 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { private Configuration conf; private AtomicBoolean callbackCalled; + private enum SyncTestType { + ACTIVE, + STANDBY, + NEUTRAL, + ACTIVE_TIMING, + STANDBY_TIMING + } + @Before public void setup() throws IOException { conf = new YarnConfiguration(); @@ -79,6 +96,181 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { LOG.info("Stopped RM"); } + /** + * Test that neutral mode plays well with all other transitions. + * + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + @Test + public void testCallbackSynchronization() + throws IOException, InterruptedException { + testCallbackSynchronization(SyncTestType.ACTIVE); + testCallbackSynchronization(SyncTestType.STANDBY); + testCallbackSynchronization(SyncTestType.NEUTRAL); + testCallbackSynchronization(SyncTestType.ACTIVE_TIMING); + testCallbackSynchronization(SyncTestType.STANDBY_TIMING); + } + + /** + * Helper method to test that neutral mode plays well with other transitions. + * + * @param type the type of test to run + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronization(SyncTestType type) + throws IOException, InterruptedException { + AdminService as = mock(AdminService.class); + RMContext rc = mock(RMContext.class); + Configuration myConf = new Configuration(conf); + + myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); + when(rc.getRMAdminService()).thenReturn(as); + + EmbeddedElectorService ees = new EmbeddedElectorService(rc); + ees.init(myConf); + + ees.enterNeutralMode(); + + switch (type) { + case ACTIVE: + testCallbackSynchronizationActive(as, ees); + break; + case STANDBY: + testCallbackSynchronizationStandby(as, ees); + break; + case NEUTRAL: + testCallbackSynchronizationNeutral(as, ees); + break; + case ACTIVE_TIMING: + testCallbackSynchronizationTimingActive(as, ees); + break; + case STANDBY_TIMING: + testCallbackSynchronizationTimingStandby(as, ees); + break; + default: + fail("Unknown test type: " + type); + break; + } + } + + /** + * Helper method to test that neutral mode plays well with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationActive(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.becomeActive(); + + Thread.sleep(100); + + verify(as).transitionToActive((StateChangeRequestInfo)any()); + verify(as, never()).transitionToStandby((StateChangeRequestInfo)any()); + } + + /** + * Helper method to test that neutral mode plays well with a standby + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationStandby(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.becomeStandby(); + + Thread.sleep(100); + + verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any()); + verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any()); + } + + /** + * Helper method to test that neutral mode plays well with itself. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationNeutral(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.enterNeutralMode(); + + Thread.sleep(100); + + verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any()); + verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any()); + } + + /** + * Helper method to test that neutral mode does not race with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationTimingActive(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + synchronized (ees.zkDisconnectLock) { + // Sleep while holding the lock so that the timer thread can't do + // anything when it runs. Sleep until we're pretty sure the timer thread + // has tried to run. + Thread.sleep(100); + // While still holding the lock cancel the timer by transitioning. This + // simulates a race where the callback goes to cancel the timer while the + // timer is trying to run. + ees.becomeActive(); + } + + // Sleep just a little more so that the timer thread can do whatever it's + // going to do, hopefully nothing. + Thread.sleep(50); + + verify(as).transitionToActive((StateChangeRequestInfo)any()); + verify(as, never()).transitionToStandby((StateChangeRequestInfo)any()); + } + + /** + * Helper method to test that neutral mode does not race with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationTimingStandby(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + synchronized (ees.zkDisconnectLock) { + // Sleep while holding the lock so that the timer thread can't do + // anything when it runs. Sleep until we're pretty sure the timer thread + // has tried to run. + Thread.sleep(100); + // While still holding the lock cancel the timer by transitioning. This + // simulates a race where the callback goes to cancel the timer while the + // timer is trying to run. + ees.becomeStandby(); + } + + // Sleep just a little more so that the timer thread can do whatever it's + // going to do, hopefully nothing. + Thread.sleep(50); + + verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any()); + verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any()); + } + private class MockRMWithElector extends MockRM { private long delayMs = 0;