YARN-3742. YARN RM will shut down if ZKClient creation times out. (Daniel Templeton via kasha)
This commit is contained in:
parent
f7faac8e90
commit
166be0ee95
|
@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
|
||||
import org.apache.hadoop.yarn.webapp.YarnWebParams;
|
||||
import org.junit.After;
|
||||
|
@ -200,7 +202,8 @@ public class TestRMFailover extends ClientBaseWithFixes {
|
|||
// so it transitions to standby.
|
||||
ResourceManager rm = cluster.getResourceManager(
|
||||
cluster.getActiveRMIndex());
|
||||
rm.handleTransitionToStandByInNewThread();
|
||||
rm.getRMContext().getDispatcher().getEventHandler().handle(
|
||||
new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED, "test"));
|
||||
verifyRMTransitionToStandby(rm);
|
||||
verifyConnections();
|
||||
}
|
||||
|
|
|
@ -108,8 +108,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
|
|||
|
||||
elector.ensureParentZNode();
|
||||
if (!isParentZnodeSafe(clusterId)) {
|
||||
notifyFatalError(electionZNode + " znode has invalid data! "+
|
||||
"Might need formatting!");
|
||||
notifyFatalError(String.format("invalid data in znode, %s, " +
|
||||
"which may require the state store to be reformatted",
|
||||
electionZNode));
|
||||
}
|
||||
|
||||
super.serviceInit(conf);
|
||||
|
|
|
@ -304,13 +304,12 @@ public class AdminService extends CompositeService implements
|
|||
// call all refresh*s for active RM to get the updated configurations.
|
||||
refreshAll();
|
||||
} catch (Exception e) {
|
||||
LOG.error("RefreshAll failed so firing fatal event", e);
|
||||
rmContext
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED,
|
||||
e));
|
||||
e, "failure to refresh configuration settings"));
|
||||
throw new ServiceFailedException(
|
||||
"Error on refreshAll during transition to Active", e);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.lang.Thread.UncaughtExceptionHandler;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* This class either shuts down {@link ResourceManager} or transitions the
|
||||
|
@ -45,14 +45,19 @@ public class RMCriticalThreadUncaughtExceptionHandler
|
|||
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
LOG.fatal("Critical thread " + t.getName() + " crashed!", e);
|
||||
Exception ex;
|
||||
|
||||
if (HAUtil.isHAEnabled(rmContext.getYarnConfiguration())) {
|
||||
rmContext.getResourceManager().handleTransitionToStandByInNewThread();
|
||||
if (e instanceof Exception) {
|
||||
ex = (Exception)e;
|
||||
} else {
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMFatalEvent(RMFatalEventType.CRITICAL_THREAD_CRASH,
|
||||
new Exception(e)));
|
||||
ex = new YarnException(e);
|
||||
}
|
||||
|
||||
RMFatalEvent event =
|
||||
new RMFatalEvent(RMFatalEventType.CRITICAL_THREAD_CRASH, ex,
|
||||
String.format("a critical thread, %s, that exited unexpectedly",
|
||||
t.getName()));
|
||||
|
||||
rmContext.getDispatcher().getEventHandler().handle(event);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,18 +20,73 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
|
||||
/**
|
||||
* Event that indicates a non-recoverable error for the resource manager.
|
||||
*/
|
||||
public class RMFatalEvent extends AbstractEvent<RMFatalEventType> {
|
||||
private String cause;
|
||||
private final Exception cause;
|
||||
private final String message;
|
||||
|
||||
public RMFatalEvent(RMFatalEventType rmFatalEventType, String cause) {
|
||||
/**
|
||||
* Create a new event of the given type with the given cause.
|
||||
* @param rmFatalEventType The {@link RMFatalEventType} of the event
|
||||
* @param message a text description of the reason for the event
|
||||
*/
|
||||
public RMFatalEvent(RMFatalEventType rmFatalEventType, String message) {
|
||||
this(rmFatalEventType, null, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new event of the given type around the given source
|
||||
* {@link Exception}.
|
||||
* @param rmFatalEventType The {@link RMFatalEventType} of the event
|
||||
* @param cause the source exception
|
||||
*/
|
||||
public RMFatalEvent(RMFatalEventType rmFatalEventType, Exception cause) {
|
||||
this(rmFatalEventType, cause, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new event of the given type around the given source
|
||||
* {@link Exception} with the given cause.
|
||||
* @param rmFatalEventType The {@link RMFatalEventType} of the event
|
||||
* @param cause the source exception
|
||||
* @param message a text description of the reason for the event
|
||||
*/
|
||||
public RMFatalEvent(RMFatalEventType rmFatalEventType, Exception cause,
|
||||
String message) {
|
||||
super(rmFatalEventType);
|
||||
this.cause = cause;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public RMFatalEvent(RMFatalEventType rmFatalEventType, Exception cause) {
|
||||
super(rmFatalEventType);
|
||||
this.cause = StringUtils.stringifyException(cause);
|
||||
/**
|
||||
* Get a text description of the reason for the event. If a cause was, that
|
||||
* {@link Exception} will be converted to a {@link String} and included in
|
||||
* the result.
|
||||
* @return a text description of the reason for the event
|
||||
*/
|
||||
public String getExplanation() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
if (message != null) {
|
||||
sb.append(message);
|
||||
|
||||
if (cause != null) {
|
||||
sb.append(": ");
|
||||
}
|
||||
}
|
||||
|
||||
public String getCause() {return this.cause;}
|
||||
if (cause != null) {
|
||||
sb.append(StringUtils.stringifyException(cause));
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("RMFatalEvent of type %s, caused by %s",
|
||||
getType().name(), getExplanation());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
public enum RMFatalEventType {
|
||||
// Source <- Store
|
||||
STATE_STORE_FENCED,
|
||||
STATE_STORE_OP_FAILED,
|
||||
|
||||
// Source <- Embedded Elector
|
||||
|
|
|
@ -811,15 +811,45 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
|
||||
@Private
|
||||
public static class RMFatalEventDispatcher
|
||||
implements EventHandler<RMFatalEvent> {
|
||||
|
||||
private class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> {
|
||||
@Override
|
||||
public void handle(RMFatalEvent event) {
|
||||
LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
|
||||
event.getType().name() + ". Cause:\n" + event.getCause());
|
||||
LOG.error("Received " + event);
|
||||
|
||||
ExitUtil.terminate(1, event.getCause());
|
||||
if (HAUtil.isHAEnabled(getConfig())) {
|
||||
// If we're in an HA config, the right answer is always to go into
|
||||
// standby.
|
||||
LOG.warn("Transitioning the resource manager to standby.");
|
||||
handleTransitionToStandByInNewThread();
|
||||
} else {
|
||||
// If we're stand-alone, we probably want to shut down, but the if and
|
||||
// how depends on the event.
|
||||
switch(event.getType()) {
|
||||
case STATE_STORE_FENCED:
|
||||
LOG.fatal("State store fenced even though the resource manager " +
|
||||
"is not configured for high availability. Shutting down this " +
|
||||
"resource manager to protect the integrity of the state store.");
|
||||
ExitUtil.terminate(1, event.getExplanation());
|
||||
break;
|
||||
case STATE_STORE_OP_FAILED:
|
||||
if (YarnConfiguration.shouldRMFailFast(getConfig())) {
|
||||
LOG.fatal("Shutting down the resource manager because a state " +
|
||||
"store operation failed, and the resource manager is " +
|
||||
"configured to fail fast. See the yarn.fail-fast and " +
|
||||
"yarn.resourcemanager.fail-fast properties.");
|
||||
ExitUtil.terminate(1, event.getExplanation());
|
||||
} else {
|
||||
LOG.warn("Ignoring state store operation failure because the " +
|
||||
"resource manager is not configured to fail fast. See the " +
|
||||
"yarn.fail-fast and yarn.resourcemanager.fail-fast " +
|
||||
"properties.");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG.fatal("Shutting down the resource manager.");
|
||||
ExitUtil.terminate(1, event.getExplanation());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -827,7 +857,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
* Transition to standby state in a new thread. The transition operation is
|
||||
* asynchronous to avoid deadlock caused by cyclic dependency.
|
||||
*/
|
||||
public void handleTransitionToStandByInNewThread() {
|
||||
private void handleTransitionToStandByInNewThread() {
|
||||
Thread standByTransitionThread =
|
||||
new Thread(activeServices.standByTransitionRunnable);
|
||||
standByTransitionThread.setName("StandByTransitionThread");
|
||||
|
|
|
@ -1129,18 +1129,18 @@ public abstract class RMStateStore extends AbstractService {
|
|||
Exception failureCause) {
|
||||
boolean isFenced = false;
|
||||
LOG.error("State store operation failed ", failureCause);
|
||||
|
||||
if (HAUtil.isHAEnabled(getConfig())) {
|
||||
LOG.warn("State-store fenced ! Transitioning RM to standby");
|
||||
rmDispatcher.getEventHandler().handle(
|
||||
new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
|
||||
failureCause));
|
||||
isFenced = true;
|
||||
resourceManager.handleTransitionToStandByInNewThread();
|
||||
} else if (YarnConfiguration.shouldRMFailFast(getConfig())) {
|
||||
LOG.fatal("Fail RM now due to state-store error!");
|
||||
} else {
|
||||
rmDispatcher.getEventHandler().handle(
|
||||
new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED,
|
||||
failureCause));
|
||||
} else {
|
||||
LOG.warn("Skip the state-store error.");
|
||||
}
|
||||
|
||||
return isFenced;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ public class TestMemoryRMStateStore {
|
|||
store.init(conf);
|
||||
ResourceManager mockRM = mock(ResourceManager.class);
|
||||
store.setResourceManager(mockRM);
|
||||
store.setRMDispatcher(new RMStateStoreTestBase.TestDispatcher());
|
||||
RMDelegationTokenIdentifier mockTokenId =
|
||||
mock(RMDelegationTokenIdentifier.class);
|
||||
store.removeRMDelegationToken(mockTokenId);
|
||||
|
@ -58,6 +59,7 @@ public class TestMemoryRMStateStore {
|
|||
};
|
||||
store.init(conf);
|
||||
store.setResourceManager(mockRM);
|
||||
store.setRMDispatcher(new RMStateStoreTestBase.TestDispatcher());
|
||||
store.removeRMDelegationToken(mockTokenId);
|
||||
assertTrue("RMStateStore should have been in fenced state",
|
||||
store.isFencedState());
|
||||
|
|
Loading…
Reference in New Issue