Merge r1618915 from trunk. YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher. Contributed by Rohith

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1618916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-08-19 17:50:48 +00:00
parent d926fd12cf
commit aeab638ce2
3 changed files with 29 additions and 0 deletions

View File

@ -193,6 +193,9 @@ Release 2.6.0 - UNRELEASED
YARN-2397. Avoided loading two authentication filters for RM and TS web YARN-2397. Avoided loading two authentication filters for RM and TS web
interfaces. (Varun Vasudev via zjshen) interfaces. (Varun Vasudev via zjshen)
YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher.
(Rohith via jianhe)
Release 2.5.0 - UNRELEASED Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1161,6 +1161,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
((Service)dispatcher).init(this.conf); ((Service)dispatcher).init(this.conf);
((Service)dispatcher).start(); ((Service)dispatcher).start();
removeService((Service)rmDispatcher); removeService((Service)rmDispatcher);
// Need to stop previous rmDispatcher before assigning new dispatcher
// otherwise causes "AsyncDispatcher event handler" thread leak
((Service) rmDispatcher).stop();
rmDispatcher = dispatcher; rmDispatcher = dispatcher;
addIfService(rmDispatcher); addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher); rmContext.setDispatcher(rmDispatcher);

View File

@ -332,6 +332,10 @@ public class TestRMHA {
rm.adminService.transitionToActive(requestInfo); rm.adminService.transitionToActive(requestInfo);
rm.adminService.transitionToStandby(requestInfo); rm.adminService.transitionToStandby(requestInfo);
MyCountingDispatcher dispatcher =
(MyCountingDispatcher) rm.getRMContext().getDispatcher();
assertTrue(!dispatcher.isStopped());
rm.adminService.transitionToActive(requestInfo); rm.adminService.transitionToActive(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher()) ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
@ -339,6 +343,11 @@ public class TestRMHA {
assertEquals(errorMessageForService, expectedServiceCount, assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size()); rm.getServices().size());
// Keep the dispatcher reference before transitioning to standby
dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
rm.adminService.transitionToStandby(requestInfo); rm.adminService.transitionToStandby(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher()) ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
@ -346,6 +355,8 @@ public class TestRMHA {
assertEquals(errorMessageForService, expectedServiceCount, assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size()); rm.getServices().size());
assertTrue(dispatcher.isStopped());
rm.stop(); rm.stop();
} }
@ -492,6 +503,8 @@ public class TestRMHA {
private int eventHandlerCount; private int eventHandlerCount;
private volatile boolean stopped = false;
public MyCountingDispatcher() { public MyCountingDispatcher() {
super("MyCountingDispatcher"); super("MyCountingDispatcher");
this.eventHandlerCount = 0; this.eventHandlerCount = 0;
@ -510,5 +523,15 @@ public class TestRMHA {
public int getEventHandlerCount() { public int getEventHandlerCount() {
return this.eventHandlerCount; return this.eventHandlerCount;
} }
@Override
protected void serviceStop() throws Exception {
this.stopped = true;
super.serviceStop();
}
public boolean isStopped() {
return this.stopped;
}
} }
} }