YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and FatalEventDispatcher try to transition RM to StandBy at the same time. Contributed by Rohith Sharmaks

(cherry picked from commit 395275af86)
This commit is contained in:
Jian He 2014-11-05 16:59:54 -08:00
parent 3bc31e35a6
commit f92ff24f5e
6 changed files with 109 additions and 39 deletions

View File

@ -841,6 +841,10 @@ Release 2.6.0 - UNRELEASED
YARN-2805. Fixed ResourceManager to load HA configs correctly before kerberos YARN-2805. Fixed ResourceManager to load HA configs correctly before kerberos
login. (Wangda Tan via vinodkv) login. (Wangda Tan via vinodkv)
YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and
FatalEventDispatcher try to transition RM to StandBy at the same time.
(Rohith Sharmaks via jianhe)
Release 2.5.2 - UNRELEASED Release 2.5.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
import org.junit.After; import org.junit.After;
@ -173,7 +171,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
verifyConnections(); verifyConnections();
} }
@SuppressWarnings("unchecked")
@Test @Test
public void testAutomaticFailover() public void testAutomaticFailover()
throws YarnException, InterruptedException, IOException { throws YarnException, InterruptedException, IOException {
@ -196,10 +193,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
// so it transitions to standby. // so it transitions to standby.
ResourceManager rm = cluster.getResourceManager( ResourceManager rm = cluster.getResourceManager(
cluster.getActiveRMIndex()); cluster.getActiveRMIndex());
RMFatalEvent event = rm.handleTransitionToStandBy();
new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
"Fake RMFatalEvent");
rm.getRMContext().getDispatcher().getEventHandler().handle(event);
int maxWaitingAttempts = 2000; int maxWaitingAttempts = 2000;
while (maxWaitingAttempts-- > 0 ) { while (maxWaitingAttempts-- > 0 ) {
if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public enum RMFatalEventType { public enum RMFatalEventType {
// Source <- Store // Source <- Store
STATE_STORE_FENCED,
STATE_STORE_OP_FAILED, STATE_STORE_OP_FAILED,
// Source <- Embedded Elector // Source <- Embedded Elector

View File

@ -269,6 +269,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@VisibleForTesting @VisibleForTesting
protected void setRMStateStore(RMStateStore rmStore) { protected void setRMStateStore(RMStateStore rmStore) {
rmStore.setRMDispatcher(rmDispatcher); rmStore.setRMDispatcher(rmDispatcher);
rmStore.setResourceManager(this);
rmContext.setStateStore(rmStore); rmContext.setStateStore(rmStore);
} }
@ -397,11 +398,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
private EventHandler<SchedulerEvent> schedulerDispatcher; private EventHandler<SchedulerEvent> schedulerDispatcher;
private ApplicationMasterLauncher applicationMasterLauncher; private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer; private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
private boolean recoveryEnabled; private boolean recoveryEnabled;
RMActiveServices() { RMActiveServices(ResourceManager rm) {
super("RMActiveServices"); super("RMActiveServices");
this.rm = rm;
} }
@Override @Override
@ -449,6 +451,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
try { try {
rmStore.init(conf); rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher); rmStore.setRMDispatcher(rmDispatcher);
rmStore.setResourceManager(rm);
} catch (Exception e) { } catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for // the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced // HA and we need to give up master status if we got fenced
@ -729,39 +732,31 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Private @Private
public static class RMFatalEventDispatcher public static class RMFatalEventDispatcher
implements EventHandler<RMFatalEvent> { implements EventHandler<RMFatalEvent> {
private final RMContext rmContext;
private final ResourceManager rm;
public RMFatalEventDispatcher(
RMContext rmContext, ResourceManager resourceManager) {
this.rmContext = rmContext;
this.rm = resourceManager;
}
@Override @Override
public void handle(RMFatalEvent event) { public void handle(RMFatalEvent event) {
LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " + LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
event.getType().name() + ". Cause:\n" + event.getCause()); event.getType().name() + ". Cause:\n" + event.getCause());
if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) { ExitUtil.terminate(1, event.getCause());
LOG.info("RMStateStore has been fenced"); }
}
public void handleTransitionToStandBy() {
if (rmContext.isHAEnabled()) { if (rmContext.isHAEnabled()) {
try { try {
// Transition to standby and reinit active services // Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode"); LOG.info("Transitioning RM to Standby mode");
rm.transitionToStandby(true); transitionToStandby(true);
rm.adminService.resetLeaderElection(); adminService.resetLeaderElection();
return; return;
} catch (Exception e) { } catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode."); LOG.fatal("Failed to transition RM to Standby mode.");
ExitUtil.terminate(1, e);
} }
} }
} }
ExitUtil.terminate(1, event.getCause());
}
}
@Private @Private
public static final class ApplicationEventDispatcher implements public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> { EventHandler<RMAppEvent> {
@ -990,7 +985,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* @throws Exception * @throws Exception
*/ */
protected void createAndInitActiveServices() throws Exception { protected void createAndInitActiveServices() throws Exception {
activeServices = new RMActiveServices(); activeServices = new RMActiveServices(this);
activeServices.init(conf); activeServices.init(conf);
} }
@ -1227,7 +1222,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private Dispatcher setupDispatcher() { private Dispatcher setupDispatcher() {
Dispatcher dispatcher = createDispatcher(); Dispatcher dispatcher = createDispatcher();
dispatcher.register(RMFatalEventType.class, dispatcher.register(RMFatalEventType.class,
new ResourceManager.RMFatalEventDispatcher(this.rmContext, this)); new ResourceManager.RMFatalEventDispatcher());
return dispatcher; return dispatcher;
} }

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@ -87,6 +88,7 @@ public abstract class RMStateStore extends AbstractService {
"AMRMTokenSecretManagerRoot"; "AMRMTokenSecretManagerRoot";
protected static final String VERSION_NODE = "RMVersionNode"; protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode"; protected static final String EPOCH_NODE = "EpochNode";
private ResourceManager resourceManager;
public static final Log LOG = LogFactory.getLog(RMStateStore.class); public static final Log LOG = LogFactory.getLog(RMStateStore.class);
@ -818,13 +820,15 @@ public abstract class RMStateStore extends AbstractService {
* @param failureCause the exception due to which the operation failed * @param failureCause the exception due to which the operation failed
*/ */
protected void notifyStoreOperationFailed(Exception failureCause) { protected void notifyStoreOperationFailed(Exception failureCause) {
RMFatalEventType type;
if (failureCause instanceof StoreFencedException) { if (failureCause instanceof StoreFencedException) {
type = RMFatalEventType.STATE_STORE_FENCED; Thread standByTransitionThread =
new Thread(new StandByTransitionThread());
standByTransitionThread.setName("StandByTransitionThread Handler");
standByTransitionThread.start();
} else { } else {
type = RMFatalEventType.STATE_STORE_OP_FAILED; rmDispatcher.getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
} }
rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -866,4 +870,16 @@ public abstract class RMStateStore extends AbstractService {
* @throws Exception * @throws Exception
*/ */
public abstract void deleteStore() throws Exception; public abstract void deleteStore() throws Exception;
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
private class StandByTransitionThread implements Runnable {
@Override
public void run() {
LOG.info("RMStateStore has been fenced");
resourceManager.handleTransitionToStandBy();
}
}
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -451,6 +452,67 @@ public class TestRMHA {
checkActiveRMFunctionality(); checkActiveRMFunctionality();
} }
@Test(timeout = 90000)
public void testTransitionedToStandbyShouldNotHang() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration);
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized void updateApplicationState(ApplicationState appState) {
notifyStoreOperationFailed(new StoreFencedException());
}
};
memStore.init(conf);
rm = new MockRM(conf, memStore) {
@Override
void stopActiveServices() throws Exception {
Thread.sleep(10000);
super.stopActiveServices();
}
};
rm.init(conf);
final StateChangeRequestInfo requestInfo =
new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
assertEquals(STATE_ERR, HAServiceState.INITIALIZING, rm.adminService
.getServiceStatus().getState());
assertFalse("RM is ready to become active before being started",
rm.adminService.getServiceStatus().isReadyToBecomeActive());
checkMonitorHealth();
rm.start();
checkMonitorHealth();
checkStandbyRMFunctionality();
// 2. Transition to Active.
rm.adminService.transitionToActive(requestInfo);
// 3. Try Transition to standby
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
rm.transitionToStandby(true);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t.start();
rm.getRMContext().getStateStore().updateApplicationState(null);
t.join(); // wait for thread to finish
rm.adminService.transitionToStandby(requestInfo);
checkStandbyRMFunctionality();
rm.stop();
}
public void innerTestHAWithRMHostName(boolean includeBindHost) { public void innerTestHAWithRMHostName(boolean includeBindHost) {
//this is run two times, with and without a bind host configured //this is run two times, with and without a bind host configured
if (includeBindHost) { if (includeBindHost) {