YARN-6202. Configuration item Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY is disregarded
(Contributed by Yufei Gu via Daniel Templeton)
This commit is contained in:
parent
7e075a50e3
commit
dd43b895c2
|
@ -280,8 +280,6 @@ public class MRAppMaster extends CompositeService {
|
||||||
// create the job classloader if enabled
|
// create the job classloader if enabled
|
||||||
createJobClassLoader(conf);
|
createJobClassLoader(conf);
|
||||||
|
|
||||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
|
||||||
|
|
||||||
initJobCredentialsAndUGI(conf);
|
initJobCredentialsAndUGI(conf);
|
||||||
|
|
||||||
dispatcher = createDispatcher();
|
dispatcher = createDispatcher();
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
|
||||||
|
|
||||||
|
@ -120,8 +119,6 @@ public class JobHistoryServer extends CompositeService {
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
Configuration config = new YarnConfiguration(conf);
|
Configuration config = new YarnConfiguration(conf);
|
||||||
|
|
||||||
config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
|
||||||
|
|
||||||
// This is required for WebApps to use https if enabled.
|
// This is required for WebApps to use https if enabled.
|
||||||
MRWebAppUtil.initialize(getConfig());
|
MRWebAppUtil.initialize(getConfig());
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -72,7 +71,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
|
|
||||||
private Thread eventHandlingThread;
|
private Thread eventHandlingThread;
|
||||||
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
|
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
|
||||||
private boolean exitOnDispatchException;
|
private boolean exitOnDispatchException = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The thread name for dispatcher.
|
* The thread name for dispatcher.
|
||||||
|
@ -131,12 +130,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@VisibleForTesting
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
public void disableExitOnDispatchException() {
|
||||||
this.exitOnDispatchException =
|
exitOnDispatchException = false;
|
||||||
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
|
|
||||||
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
|
|
||||||
super.serviceInit(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -31,15 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
@Evolving
|
@Evolving
|
||||||
public interface Dispatcher {
|
public interface Dispatcher {
|
||||||
|
|
||||||
// Configuration to make sure dispatcher crashes but doesn't do system-exit in
|
|
||||||
// case of errors. By default, it should be false, so that tests are not
|
|
||||||
// affected. For all daemons it should be explicitly set to true so that
|
|
||||||
// daemons can crash instead of hanging around.
|
|
||||||
public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
|
|
||||||
"yarn.dispatcher.exit-on-error";
|
|
||||||
|
|
||||||
public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
|
|
||||||
|
|
||||||
EventHandler<Event> getEventHandler();
|
EventHandler<Event> getEventHandler();
|
||||||
|
|
||||||
void register(Class<? extends Enum> eventType, EventHandler handler);
|
void register(Class<? extends Enum> eventType, EventHandler handler);
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.event;
|
package org.apache.hadoop.yarn.event;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -44,7 +44,7 @@ public class EventDispatcher<T extends Event> extends
|
||||||
new LinkedBlockingDeque<>();
|
new LinkedBlockingDeque<>();
|
||||||
private final Thread eventProcessor;
|
private final Thread eventProcessor;
|
||||||
private volatile boolean stopped = false;
|
private volatile boolean stopped = false;
|
||||||
private boolean shouldExitOnError = false;
|
private boolean shouldExitOnError = true;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
|
private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
|
||||||
|
|
||||||
|
@ -91,14 +91,6 @@ public class EventDispatcher<T extends Event> extends
|
||||||
this.eventProcessor.setName(getName() + ":Event Processor");
|
this.eventProcessor.setName(getName() + ":Event Processor");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
|
||||||
this.shouldExitOnError =
|
|
||||||
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
|
|
||||||
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
|
|
||||||
super.serviceInit(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
this.eventProcessor.start();
|
this.eventProcessor.start();
|
||||||
|
@ -134,4 +126,9 @@ public class EventDispatcher<T extends Event> extends
|
||||||
LOG.info("Interrupted. Trying to exit gracefully.");
|
LOG.info("Interrupted. Trying to exit gracefully.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void disableExitOnError() {
|
||||||
|
shouldExitOnError = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.event;
|
package org.apache.hadoop.yarn.event;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
@ -36,13 +34,8 @@ public class DrainDispatcher extends AsyncDispatcher {
|
||||||
super(eventQueue);
|
super(eventQueue);
|
||||||
this.queue = eventQueue;
|
this.queue = eventQueue;
|
||||||
this.mutex = this;
|
this.mutex = this;
|
||||||
}
|
// Disable system exit since this class is only for unit tests.
|
||||||
|
disableExitOnDispatchException();
|
||||||
@Override
|
|
||||||
public void serviceInit(Configuration conf)
|
|
||||||
throws Exception {
|
|
||||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
|
|
||||||
super.serviceInit(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -299,9 +299,6 @@ public class NodeManager extends CompositeService
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
|
||||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
|
||||||
|
|
||||||
rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
|
rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
|
||||||
.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
|
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
|
||||||
|
|
|
@ -68,6 +68,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
||||||
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
|
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
|
||||||
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
|
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
|
||||||
dirsHandler);
|
dirsHandler);
|
||||||
|
dispatcher.disableExitOnDispatchException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -392,6 +392,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
stateStore.start();
|
stateStore.start();
|
||||||
Context context = createContext(conf, stateStore);
|
Context context = createContext(conf, stateStore);
|
||||||
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
||||||
|
cm.dispatcher.disableExitOnDispatchException();
|
||||||
cm.init(conf);
|
cm.init(conf);
|
||||||
cm.start();
|
cm.start();
|
||||||
// add an application by starting a container
|
// add an application by starting a container
|
||||||
|
@ -732,7 +733,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
return new ContainerManagerImpl(context,
|
ContainerManagerImpl containerManager = new ContainerManagerImpl(context,
|
||||||
mock(ContainerExecutor.class), mock(DeletionService.class),
|
mock(ContainerExecutor.class), mock(DeletionService.class),
|
||||||
mock(NodeStatusUpdater.class), metrics, null) {
|
mock(NodeStatusUpdater.class), metrics, null) {
|
||||||
@Override
|
@Override
|
||||||
|
@ -767,5 +768,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
containerManager.dispatcher.disableExitOnDispatchException();
|
||||||
|
return containerManager;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,6 @@ import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -1759,7 +1758,6 @@ public class TestResourceLocalizationService {
|
||||||
sDirs[i] = localDirs.get(i).toString();
|
sDirs[i] = localDirs.get(i).toString();
|
||||||
}
|
}
|
||||||
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
||||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
|
||||||
|
|
||||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
|
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
|
||||||
|
|
|
@ -580,7 +580,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
activeServiceContext = new RMActiveServiceContext();
|
activeServiceContext = new RMActiveServiceContext();
|
||||||
rmContext.setActiveServiceContext(activeServiceContext);
|
rmContext.setActiveServiceContext(activeServiceContext);
|
||||||
|
|
||||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
|
||||||
rmSecretManagerService = createRMSecretManagerService();
|
rmSecretManagerService = createRMSecretManagerService();
|
||||||
addService(rmSecretManagerService);
|
addService(rmSecretManagerService);
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -67,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
@ -1273,4 +1273,12 @@ public class MockRM extends ResourceManager {
|
||||||
false, false, null, 0, null, true, priority, null, applicationTimeouts,
|
false, false, null, 0, null, true, priority, null, applicationTimeouts,
|
||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
super.serviceInit(conf);
|
||||||
|
if (getRmDispatcher() instanceof AsyncDispatcher) {
|
||||||
|
((AsyncDispatcher) getRmDispatcher()).disableExitOnDispatchException();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ public class TestRMDispatcher {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
EventDispatcher schedulerDispatcher =
|
EventDispatcher schedulerDispatcher =
|
||||||
new EventDispatcher(sched, sched.getClass().getName());
|
new EventDispatcher(sched, sched.getClass().getName());
|
||||||
|
schedulerDispatcher.disableExitOnError();
|
||||||
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
|
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
|
||||||
rmDispatcher.init(conf);
|
rmDispatcher.init(conf);
|
||||||
rmDispatcher.start();
|
rmDispatcher.start();
|
||||||
|
|
|
@ -71,6 +71,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
init(conf);
|
init(conf);
|
||||||
Assert.assertNull(fs);
|
Assert.assertNull(fs);
|
||||||
assertTrue(workingDirPathURI.equals(fsWorkingPath));
|
assertTrue(workingDirPathURI.equals(fsWorkingPath));
|
||||||
|
dispatcher.disableExitOnDispatchException();
|
||||||
start();
|
start();
|
||||||
Assert.assertNotNull(fs);
|
Assert.assertNotNull(fs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
|
||||||
stateStore = new LeveldbRMStateStore();
|
stateStore = new LeveldbRMStateStore();
|
||||||
stateStore.init(conf);
|
stateStore.init(conf);
|
||||||
stateStore.start();
|
stateStore.start();
|
||||||
|
stateStore.dispatcher.disableExitOnDispatchException();
|
||||||
return stateStore;
|
return stateStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -118,6 +118,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
setResourceManager(new ResourceManager());
|
setResourceManager(new ResourceManager());
|
||||||
init(conf);
|
init(conf);
|
||||||
|
dispatcher.disableExitOnDispatchException();
|
||||||
start();
|
start();
|
||||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||||
}
|
}
|
||||||
|
|
|
@ -807,8 +807,8 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
||||||
try {
|
try {
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
|
|
||||||
DrainDispatcher privateDispatcher = new DrainDispatcher();
|
DrainDispatcher privateDispatcher = new DrainDispatcher();
|
||||||
|
privateDispatcher.disableExitOnDispatchException();
|
||||||
SleepHandler sleepHandler = new SleepHandler();
|
SleepHandler sleepHandler = new SleepHandler();
|
||||||
ResourceTrackerService privateResourceTrackerService =
|
ResourceTrackerService privateResourceTrackerService =
|
||||||
getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
|
getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
|
||||||
|
|
Loading…
Reference in New Issue