YARN-5677. RM should transition to standby when connection is lost for an extended period. (Daniel Templeton via kasha)

(cherry picked from commit aedd5c4c1b)
This commit is contained in:
Karthik Kambatla 2016-10-25 13:15:58 -07:00
parent bd4e5bc501
commit a0fa1f8386
2 changed files with 245 additions and 6 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,6 +40,8 @@ import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -54,6 +57,10 @@ public class EmbeddedElectorService extends AbstractService
private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector;
private long zkSessionTimeout;
private Timer zkDisconnectTimer;
@VisibleForTesting
final Object zkDisconnectLock = new Object();
EmbeddedElectorService(RMContext rmContext) {
super(EmbeddedElectorService.class.getName());
@ -80,7 +87,7 @@ public class EmbeddedElectorService extends AbstractService
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
@ -123,6 +130,8 @@ public class EmbeddedElectorService extends AbstractService
@Override
public void becomeActive() throws ServiceFailedException {
cancelDisconnectTimer();
try {
rmContext.getRMAdminService().transitionToActive(req);
} catch (Exception e) {
@ -132,6 +141,8 @@ public class EmbeddedElectorService extends AbstractService
@Override
public void becomeStandby() {
cancelDisconnectTimer();
try {
rmContext.getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
@ -139,13 +150,49 @@ public class EmbeddedElectorService extends AbstractService
}
}
/**
* Stop the disconnect timer. Any running tasks will be allowed to complete.
*/
private void cancelDisconnectTimer() {
synchronized (zkDisconnectLock) {
if (zkDisconnectTimer != null) {
zkDisconnectTimer.cancel();
zkDisconnectTimer = null;
}
}
}
/**
* When the ZK client loses contact with ZK, this method will be called to
* allow the RM to react. Because the loss of connection can be noticed
* before the session timeout happens, it is undesirable to transition
* immediately. Instead the method starts a timer that will wait
* {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
* initiating the transition into standby state.
*/
@Override
public void enterNeutralMode() {
/**
* Possibly due to transient connection issues. Do nothing.
* TODO: Might want to keep track of how long in this state and transition
* to standby.
*/
LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+ zkSessionTimeout + " ms if connection is not reestablished.");
// If we've just become disconnected, start a timer. When the time's up,
// we'll transition to standby.
synchronized (zkDisconnectLock) {
if (zkDisconnectTimer == null) {
zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
zkDisconnectTimer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (zkDisconnectLock) {
// Only run if the timer hasn't been cancelled
if (zkDisconnectTimer != null) {
becomeStandby();
}
}
}
}, zkSessionTimeout);
}
}
}
@SuppressWarnings(value = "unchecked")

View File

@ -28,6 +28,15 @@ import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestRMEmbeddedElector extends ClientBaseWithFixes {
private static final Log LOG =
@ -41,6 +50,14 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
private Configuration conf;
private AtomicBoolean callbackCalled;
private enum SyncTestType {
ACTIVE,
STANDBY,
NEUTRAL,
ACTIVE_TIMING,
STANDBY_TIMING
}
@Before
public void setup() throws IOException {
conf = new YarnConfiguration();
@ -79,6 +96,181 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
LOG.info("Stopped RM");
}
/**
* Test that neutral mode plays well with all other transitions.
*
* @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted
*/
@Test
public void testCallbackSynchronization()
throws IOException, InterruptedException {
testCallbackSynchronization(SyncTestType.ACTIVE);
testCallbackSynchronization(SyncTestType.STANDBY);
testCallbackSynchronization(SyncTestType.NEUTRAL);
testCallbackSynchronization(SyncTestType.ACTIVE_TIMING);
testCallbackSynchronization(SyncTestType.STANDBY_TIMING);
}
/**
* Helper method to test that neutral mode plays well with other transitions.
*
* @param type the type of test to run
* @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronization(SyncTestType type)
throws IOException, InterruptedException {
AdminService as = mock(AdminService.class);
RMContext rc = mock(RMContext.class);
Configuration myConf = new Configuration(conf);
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
when(rc.getRMAdminService()).thenReturn(as);
EmbeddedElectorService ees = new EmbeddedElectorService(rc);
ees.init(myConf);
ees.enterNeutralMode();
switch (type) {
case ACTIVE:
testCallbackSynchronizationActive(as, ees);
break;
case STANDBY:
testCallbackSynchronizationStandby(as, ees);
break;
case NEUTRAL:
testCallbackSynchronizationNeutral(as, ees);
break;
case ACTIVE_TIMING:
testCallbackSynchronizationTimingActive(as, ees);
break;
case STANDBY_TIMING:
testCallbackSynchronizationTimingStandby(as, ees);
break;
default:
fail("Unknown test type: " + type);
break;
}
}
/**
* Helper method to test that neutral mode plays well with an active
* transition.
*
* @param as the admin service
* @param ees the embedded elector service
* @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationActive(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ees.becomeActive();
Thread.sleep(100);
verify(as).transitionToActive((StateChangeRequestInfo)any());
verify(as, never()).transitionToStandby((StateChangeRequestInfo)any());
}
/**
* Helper method to test that neutral mode plays well with a standby
* transition.
*
* @param as the admin service
* @param ees the embedded elector service
* @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationStandby(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ees.becomeStandby();
Thread.sleep(100);
verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any());
verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any());
}
/**
* Helper method to test that neutral mode plays well with itself.
*
* @param as the admin service
* @param ees the embedded elector service
* @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationNeutral(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ees.enterNeutralMode();
Thread.sleep(100);
verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any());
verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any());
}
/**
* Helper method to test that neutral mode does not race with an active
* transition.
*
* @param as the admin service
* @param ees the embedded elector service
* @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationTimingActive(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread
// has tried to run.
Thread.sleep(100);
// While still holding the lock cancel the timer by transitioning. This
// simulates a race where the callback goes to cancel the timer while the
// timer is trying to run.
ees.becomeActive();
}
// Sleep just a little more so that the timer thread can do whatever it's
// going to do, hopefully nothing.
Thread.sleep(50);
verify(as).transitionToActive((StateChangeRequestInfo)any());
verify(as, never()).transitionToStandby((StateChangeRequestInfo)any());
}
/**
* Helper method to test that neutral mode does not race with an active
* transition.
*
* @param as the admin service
* @param ees the embedded elector service
* @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationTimingStandby(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread
// has tried to run.
Thread.sleep(100);
// While still holding the lock cancel the timer by transitioning. This
// simulates a race where the callback goes to cancel the timer while the
// timer is trying to run.
ees.becomeStandby();
}
// Sleep just a little more so that the timer thread can do whatever it's
// going to do, hopefully nothing.
Thread.sleep(50);
verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any());
verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any());
}
private class MockRMWithElector extends MockRM {
private long delayMs = 0;