YARN-3893. Both RM in active state when Admin#transitionToActive failure from refeshAll() (Bibin A Chundatt via rohithsharmaks)
(cherry picked from commit 7d6687fe76
)
This commit is contained in:
parent
3ab43accaf
commit
c2ed7e4a09
|
@ -61,6 +61,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
YARN-3857: Memory leak in ResourceManager with SIMPLE mode.
|
YARN-3857: Memory leak in ResourceManager with SIMPLE mode.
|
||||||
(mujunchao via zxu)
|
(mujunchao via zxu)
|
||||||
|
|
||||||
|
YARN-3893. Both RM in active state when Admin#transitionToActive failure
|
||||||
|
from refeshAll() (Bibin A Chundatt via rohithsharmaks)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -285,6 +285,7 @@ public class AdminService extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public synchronized void transitionToActive(
|
public synchronized void transitionToActive(
|
||||||
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
|
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
|
||||||
|
@ -300,10 +301,6 @@ public class AdminService extends CompositeService implements
|
||||||
checkHaStateChange(reqInfo);
|
checkHaStateChange(reqInfo);
|
||||||
try {
|
try {
|
||||||
rm.transitionToActive();
|
rm.transitionToActive();
|
||||||
// call all refresh*s for active RM to get the updated configurations.
|
|
||||||
refreshAll();
|
|
||||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
|
||||||
"transitionToActive", "RMHAProtocolService");
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
|
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
|
||||||
"", "RMHAProtocolService",
|
"", "RMHAProtocolService",
|
||||||
|
@ -311,6 +308,21 @@ public class AdminService extends CompositeService implements
|
||||||
throw new ServiceFailedException(
|
throw new ServiceFailedException(
|
||||||
"Error when transitioning to Active mode", e);
|
"Error when transitioning to Active mode", e);
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
// call all refresh*s for active RM to get the updated configurations.
|
||||||
|
refreshAll();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("RefreshAll failed so firing fatal event", e);
|
||||||
|
rmContext
|
||||||
|
.getDispatcher()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e));
|
||||||
|
throw new ServiceFailedException(
|
||||||
|
"Error on refreshAll during transistion to Active", e);
|
||||||
|
}
|
||||||
|
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
|
||||||
|
"RMHAProtocolService");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -26,5 +26,8 @@ public enum RMFatalEventType {
|
||||||
STATE_STORE_OP_FAILED,
|
STATE_STORE_OP_FAILED,
|
||||||
|
|
||||||
// Source <- Embedded Elector
|
// Source <- Embedded Elector
|
||||||
EMBEDDED_ELECTOR_FAILED
|
EMBEDDED_ELECTOR_FAILED,
|
||||||
|
|
||||||
|
// Source <- Admin Service
|
||||||
|
TRANSITION_TO_ACTIVE_FAILED
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,8 @@ import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
|
@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.codehaus.jettison.json.JSONException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -577,6 +580,56 @@ public class TestRMHA {
|
||||||
assertEquals(0, rm.getRMContext().getRMApps().size());
|
assertEquals(0, rm.getRMContext().getRMApps().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testTransitionedToActiveRefreshFail() throws Exception {
|
||||||
|
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration(configuration);
|
||||||
|
configuration = new CapacitySchedulerConfiguration(conf);
|
||||||
|
rm = new MockRM(configuration) {
|
||||||
|
@Override
|
||||||
|
protected AdminService createAdminService() {
|
||||||
|
return new AdminService(this, getRMContext()) {
|
||||||
|
@Override
|
||||||
|
protected void setConfig(Configuration conf) {
|
||||||
|
super.setConfig(configuration);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return new FailFastDispatcher();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
rm.init(configuration);
|
||||||
|
rm.start();
|
||||||
|
final StateChangeRequestInfo requestInfo =
|
||||||
|
new StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
|
||||||
|
configuration.set("yarn.scheduler.capacity.root.default.capacity", "100");
|
||||||
|
rm.adminService.transitionToStandby(requestInfo);
|
||||||
|
assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState());
|
||||||
|
configuration.set("yarn.scheduler.capacity.root.default.capacity", "200");
|
||||||
|
try {
|
||||||
|
rm.adminService.transitionToActive(requestInfo);
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue("Error on refreshAll during transistion to Active".contains(e
|
||||||
|
.getMessage()));
|
||||||
|
}
|
||||||
|
FailFastDispatcher dispatcher =
|
||||||
|
((FailFastDispatcher) rm.rmContext.getDispatcher());
|
||||||
|
dispatcher.await();
|
||||||
|
assertEquals(1, dispatcher.getEventCount());
|
||||||
|
// Making correct conf and check the state
|
||||||
|
configuration.set("yarn.scheduler.capacity.root.default.capacity", "100");
|
||||||
|
rm.adminService.transitionToActive(requestInfo);
|
||||||
|
assertEquals(HAServiceState.ACTIVE, rm.getRMContext().getHAServiceState());
|
||||||
|
rm.adminService.transitionToStandby(requestInfo);
|
||||||
|
assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState());
|
||||||
|
}
|
||||||
|
|
||||||
public void innerTestHAWithRMHostName(boolean includeBindHost) {
|
public void innerTestHAWithRMHostName(boolean includeBindHost) {
|
||||||
//this is run two times, with and without a bind host configured
|
//this is run two times, with and without a bind host configured
|
||||||
if (includeBindHost) {
|
if (includeBindHost) {
|
||||||
|
@ -713,4 +766,22 @@ public class TestRMHA {
|
||||||
return this.stopped;
|
return this.stopped;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class FailFastDispatcher extends DrainDispatcher {
|
||||||
|
int eventreceived = 0;
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Override
|
||||||
|
protected void dispatch(Event event) {
|
||||||
|
if (event.getType() == RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED) {
|
||||||
|
eventreceived++;
|
||||||
|
} else {
|
||||||
|
super.dispatch(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getEventCount() {
|
||||||
|
return eventreceived;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue