YARN-3893. Both RM in active state when Admin#transitionToActive failure from refeshAll() (Bibin A Chundatt via rohithsharmaks)
This commit is contained in:
parent
095ab9ab5f
commit
7d6687fe76
|
@ -868,6 +868,9 @@ Release 2.7.2 - UNRELEASED
|
|||
YARN-3857: Memory leak in ResourceManager with SIMPLE mode.
|
||||
(mujunchao via zxu)
|
||||
|
||||
YARN-3893. Both RM in active state when Admin#transitionToActive failure
|
||||
from refeshAll() (Bibin A Chundatt via rohithsharmaks)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -297,6 +297,7 @@ public class AdminService extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized void transitionToActive(
|
||||
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
|
||||
|
@ -312,10 +313,6 @@ public class AdminService extends CompositeService implements
|
|||
checkHaStateChange(reqInfo);
|
||||
try {
|
||||
rm.transitionToActive();
|
||||
// call all refresh*s for active RM to get the updated configurations.
|
||||
refreshAll();
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"transitionToActive", "RMHAProtocolService");
|
||||
} catch (Exception e) {
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
|
||||
"", "RMHAProtocolService",
|
||||
|
@ -323,6 +320,21 @@ public class AdminService extends CompositeService implements
|
|||
throw new ServiceFailedException(
|
||||
"Error when transitioning to Active mode", e);
|
||||
}
|
||||
try {
|
||||
// call all refresh*s for active RM to get the updated configurations.
|
||||
refreshAll();
|
||||
} catch (Exception e) {
|
||||
LOG.error("RefreshAll failed so firing fatal event", e);
|
||||
rmContext
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e));
|
||||
throw new ServiceFailedException(
|
||||
"Error on refreshAll during transistion to Active", e);
|
||||
}
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
|
||||
"RMHAProtocolService");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,5 +26,8 @@ public enum RMFatalEventType {
|
|||
STATE_STORE_OP_FAILED,
|
||||
|
||||
// Source <- Embedded Elector
|
||||
EMBEDDED_ELECTOR_FAILED
|
||||
EMBEDDED_ELECTOR_FAILED,
|
||||
|
||||
// Source <- Admin Service
|
||||
TRANSITION_TO_ACTIVE_FAILED
|
||||
}
|
||||
|
|
|
@ -43,6 +43,8 @@ import org.apache.hadoop.service.AbstractService;
|
|||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
|
@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.Assert;
|
||||
|
@ -577,6 +580,56 @@ public class TestRMHA {
|
|||
assertEquals(0, rm.getRMContext().getRMApps().size());
|
||||
}
|
||||
|
||||
@Test(timeout = 90000)
|
||||
public void testTransitionedToActiveRefreshFail() throws Exception {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
YarnConfiguration conf = new YarnConfiguration(configuration);
|
||||
configuration = new CapacitySchedulerConfiguration(conf);
|
||||
rm = new MockRM(configuration) {
|
||||
@Override
|
||||
protected AdminService createAdminService() {
|
||||
return new AdminService(this, getRMContext()) {
|
||||
@Override
|
||||
protected void setConfig(Configuration conf) {
|
||||
super.setConfig(configuration);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return new FailFastDispatcher();
|
||||
}
|
||||
};
|
||||
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
final StateChangeRequestInfo requestInfo =
|
||||
new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
|
||||
configuration.set("yarn.scheduler.capacity.root.default.capacity", "100");
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState());
|
||||
configuration.set("yarn.scheduler.capacity.root.default.capacity", "200");
|
||||
try {
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
} catch (Exception e) {
|
||||
assertTrue("Error on refreshAll during transistion to Active".contains(e
|
||||
.getMessage()));
|
||||
}
|
||||
FailFastDispatcher dispatcher =
|
||||
((FailFastDispatcher) rm.rmContext.getDispatcher());
|
||||
dispatcher.await();
|
||||
assertEquals(1, dispatcher.getEventCount());
|
||||
// Making correct conf and check the state
|
||||
configuration.set("yarn.scheduler.capacity.root.default.capacity", "100");
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
assertEquals(HAServiceState.ACTIVE, rm.getRMContext().getHAServiceState());
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState());
|
||||
}
|
||||
|
||||
public void innerTestHAWithRMHostName(boolean includeBindHost) {
|
||||
//this is run two times, with and without a bind host configured
|
||||
if (includeBindHost) {
|
||||
|
@ -713,4 +766,22 @@ public class TestRMHA {
|
|||
return this.stopped;
|
||||
}
|
||||
}
|
||||
|
||||
class FailFastDispatcher extends DrainDispatcher {
|
||||
int eventreceived = 0;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
protected void dispatch(Event event) {
|
||||
if (event.getType() == RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED) {
|
||||
eventreceived++;
|
||||
} else {
|
||||
super.dispatch(event);
|
||||
}
|
||||
}
|
||||
|
||||
public int getEventCount() {
|
||||
return eventreceived;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue