YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1557249 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5ecbc62eee
commit
a7632fa9e2
|
@ -95,7 +95,7 @@ public class CompositeService extends AbstractService {
|
|||
|
||||
protected synchronized boolean removeService(Service service) {
|
||||
synchronized (serviceList) {
|
||||
return serviceList.add(service);
|
||||
return serviceList.remove(service);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -295,6 +295,9 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1293. Fixed TestContainerLaunch#testInvalidEnvSyntaxDiagnostics failure
|
||||
caused by non-English system locale. (Tsuyoshi OZAWA via jianhe)
|
||||
|
||||
YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong
|
||||
via kasha)
|
||||
|
||||
Release 2.3.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -338,6 +338,33 @@ public class TestCompositeService {
|
|||
1, testService.getServices().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveService() {
|
||||
CompositeService testService = new CompositeService("TestService") {
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) {
|
||||
Integer notAService = new Integer(0);
|
||||
assertFalse("Added an integer as a service",
|
||||
addIfService(notAService));
|
||||
|
||||
Service service1 = new AbstractService("Service1") {};
|
||||
addIfService(service1);
|
||||
|
||||
Service service2 = new AbstractService("Service2") {};
|
||||
addIfService(service2);
|
||||
|
||||
Service service3 = new AbstractService("Service3") {};
|
||||
addIfService(service3);
|
||||
|
||||
removeService(service1);
|
||||
}
|
||||
};
|
||||
|
||||
testService.init(new Configuration());
|
||||
assertEquals("Incorrect number of services",
|
||||
2, testService.getServices().size());
|
||||
}
|
||||
|
||||
public static class CompositeServiceAddingAChild extends CompositeService{
|
||||
Service child;
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.security.SecurityUtil;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
|
@ -180,13 +181,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.conf = conf;
|
||||
this.rmContext = new RMContextImpl();
|
||||
|
||||
rmDispatcher = createDispatcher();
|
||||
// register the handlers for all AlwaysOn services using setupDispatcher().
|
||||
rmDispatcher = setupDispatcher();
|
||||
addIfService(rmDispatcher);
|
||||
rmContext.setDispatcher(rmDispatcher);
|
||||
|
||||
rmDispatcher.register(RMFatalEventType.class,
|
||||
new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
|
||||
|
||||
adminService = createAdminService();
|
||||
addService(adminService);
|
||||
rmContext.setRMAdminService(adminService);
|
||||
|
@ -832,6 +831,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
stopActiveServices();
|
||||
if (initialize) {
|
||||
resetDispatcher();
|
||||
createAndInitActiveServices();
|
||||
}
|
||||
}
|
||||
|
@ -994,4 +994,24 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the handlers for alwaysOn services
|
||||
*/
|
||||
private Dispatcher setupDispatcher() {
|
||||
Dispatcher dispatcher = createDispatcher();
|
||||
dispatcher.register(RMFatalEventType.class,
|
||||
new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
private void resetDispatcher() {
|
||||
Dispatcher dispatcher = setupDispatcher();
|
||||
((Service)dispatcher).init(this.conf);
|
||||
((Service)dispatcher).start();
|
||||
removeService((Service)rmDispatcher);
|
||||
rmDispatcher = dispatcher;
|
||||
addIfService(rmDispatcher);
|
||||
rmContext.setDispatcher(rmDispatcher);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,8 +26,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -222,4 +225,81 @@ public class TestRMHA {
|
|||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRMDispatcherForHA() throws IOException {
|
||||
String errorMessageForEventHandler =
|
||||
"Expect to get the same number of handlers";
|
||||
String errorMessageForService = "Expect to get the same number of services";
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
rm = new MockRM(conf) {
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return new MyCountingDispatcher();
|
||||
}
|
||||
};
|
||||
rm.init(conf);
|
||||
int expectedEventHandlerCount =
|
||||
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||
.getEventHandlerCount();
|
||||
int expectedServiceCount = rm.getServices().size();
|
||||
assertTrue(expectedEventHandlerCount != 0);
|
||||
|
||||
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
|
||||
assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
|
||||
rm.adminService.getServiceStatus().getState());
|
||||
assertFalse("RM is ready to become active before being started",
|
||||
rm.adminService.getServiceStatus().isReadyToBecomeActive());
|
||||
rm.start();
|
||||
|
||||
//call transitions to standby and active a couple of times
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
|
||||
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||
.getEventHandlerCount());
|
||||
assertEquals(errorMessageForService, expectedServiceCount,
|
||||
rm.getServices().size());
|
||||
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
|
||||
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||
.getEventHandlerCount());
|
||||
assertEquals(errorMessageForService, expectedServiceCount,
|
||||
rm.getServices().size());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
class MyCountingDispatcher extends AbstractService implements Dispatcher {
|
||||
|
||||
private int eventHandlerCount;
|
||||
|
||||
public MyCountingDispatcher() {
|
||||
super("MyCountingDispatcher");
|
||||
this.eventHandlerCount = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandler getEventHandler() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(Class<? extends Enum> eventType, EventHandler handler) {
|
||||
this.eventHandlerCount ++;
|
||||
}
|
||||
|
||||
public int getEventHandlerCount() {
|
||||
return this.eventHandlerCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue