YARN-2046. Out of band heartbeats are sent only on container kill and possibly too early. Contributed by Ming Ma
This commit is contained in:
parent
0edc764184
commit
b406dcaff2
@ -95,6 +95,9 @@ Release 2.7.3 - UNRELEASED
|
||||
YARN-4707. Remove the extra char (>) from SecureContainer.md.
|
||||
(Brahma Reddy Battula via aajisaka)
|
||||
|
||||
YARN-2046. Out of band heartbeats are sent only on container kill and
|
||||
possibly too early (Ming Ma via jlowe)
|
||||
|
||||
Release 2.7.2 - 2016-01-25
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
@ -972,6 +975,9 @@ Release 2.6.5 - UNRELEASED
|
||||
|
||||
BUG FIXES
|
||||
|
||||
YARN-2046. Out of band heartbeats are sent only on container kill and
|
||||
possibly too early (Ming Ma via jlowe)
|
||||
|
||||
Release 2.6.4 - 2016-02-11
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -77,4 +77,6 @@ public interface Context {
|
||||
boolean getDecommissioned();
|
||||
|
||||
void setDecommissioned(boolean isDecommissioned);
|
||||
|
||||
NodeStatusUpdater getNodeStatusUpdater();
|
||||
}
|
||||
|
@ -256,7 +256,8 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
// StatusUpdater should be added last so that it get started last
|
||||
// so that we make sure everything is up before registering with RM.
|
||||
addService(nodeStatusUpdater);
|
||||
|
||||
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||
|
||||
super.serviceInit(conf);
|
||||
// TODO add local dirs to del
|
||||
}
|
||||
@ -354,6 +355,7 @@ public static class NMContext implements Context {
|
||||
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
|
||||
private final NMStateStoreService stateStore;
|
||||
private boolean isDecommissioned = false;
|
||||
private NodeStatusUpdater nodeStatusUpdater;
|
||||
|
||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
@ -458,6 +460,14 @@ public void setSystemCrendentialsForApps(
|
||||
Map<ApplicationId, Credentials> systemCredentials) {
|
||||
this.systemCredentials = systemCredentials;
|
||||
}
|
||||
|
||||
public NodeStatusUpdater getNodeStatusUpdater() {
|
||||
return this.nodeStatusUpdater;
|
||||
}
|
||||
|
||||
public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
|
||||
this.nodeStatusUpdater = nodeStatusUpdater;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -316,9 +316,9 @@ private void recoverContainer(RecoveredContainerState rcs)
|
||||
if (context.getApplications().containsKey(appId)) {
|
||||
Credentials credentials = parseCredentials(launchContext);
|
||||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||
context.getNMStateStore(), req.getContainerLaunchContext(),
|
||||
req.getContainerLaunchContext(),
|
||||
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
|
||||
rcs.getDiagnostics(), rcs.getKilled());
|
||||
rcs.getDiagnostics(), rcs.getKilled(), context);
|
||||
context.getContainers().put(containerId, container);
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationContainerInitEvent(container));
|
||||
@ -833,8 +833,8 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
|
||||
|
||||
Container container =
|
||||
new ContainerImpl(getConfig(), this.dispatcher,
|
||||
context.getNMStateStore(), launchContext,
|
||||
credentials, metrics, containerTokenIdentifier);
|
||||
launchContext, credentials, metrics, containerTokenIdentifier,
|
||||
context);
|
||||
ApplicationId applicationID =
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
if (context.getContainers().putIfAbsent(containerId, container) != null) {
|
||||
@ -982,10 +982,6 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
|
||||
NMAuditLogger.logSuccess(container.getUser(),
|
||||
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
|
||||
.getApplicationAttemptId().getApplicationId(), containerID);
|
||||
|
||||
// TODO: Move this code to appropriate place once kill_container is
|
||||
// implemented.
|
||||
nodeStatusUpdater.sendOutofBandHeartBeat();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -67,6 +67,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||
@ -123,14 +124,15 @@ public class ContainerImpl implements Container {
|
||||
RecoveredContainerStatus.REQUESTED;
|
||||
// whether container was marked as killed after recovery
|
||||
private boolean recoveredAsKilled = false;
|
||||
private Context context;
|
||||
|
||||
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||
NMStateStoreService stateStore, ContainerLaunchContext launchContext,
|
||||
Credentials creds, NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier) {
|
||||
ContainerLaunchContext launchContext, Credentials creds,
|
||||
NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier, Context context) {
|
||||
this.daemonConf = conf;
|
||||
this.dispatcher = dispatcher;
|
||||
this.stateStore = stateStore;
|
||||
this.stateStore = context.getNMStateStore();
|
||||
this.launchContext = launchContext;
|
||||
this.containerTokenIdentifier = containerTokenIdentifier;
|
||||
this.containerId = containerTokenIdentifier.getContainerID();
|
||||
@ -142,19 +144,20 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
this.readLock = readWriteLock.readLock();
|
||||
this.writeLock = readWriteLock.writeLock();
|
||||
this.context = context;
|
||||
|
||||
stateMachine = stateMachineFactory.make(this);
|
||||
}
|
||||
|
||||
// constructor for a recovered container
|
||||
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||
NMStateStoreService stateStore, ContainerLaunchContext launchContext,
|
||||
Credentials creds, NodeManagerMetrics metrics,
|
||||
ContainerLaunchContext launchContext, Credentials creds,
|
||||
NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier,
|
||||
RecoveredContainerStatus recoveredStatus, int exitCode,
|
||||
String diagnostics, boolean wasKilled) {
|
||||
this(conf, dispatcher, stateStore, launchContext, creds, metrics,
|
||||
containerTokenIdentifier);
|
||||
String diagnostics, boolean wasKilled, Context context) {
|
||||
this(conf, dispatcher, launchContext, creds, metrics,
|
||||
containerTokenIdentifier, context);
|
||||
this.recoveredStatus = recoveredStatus;
|
||||
this.exitCode = exitCode;
|
||||
this.recoveredAsKilled = wasKilled;
|
||||
@ -973,6 +976,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
|
||||
(AuxServicesEventType.CONTAINER_STOP, container));
|
||||
}
|
||||
container.context.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -131,6 +131,7 @@ public long getRMIdentifier() {
|
||||
nodeStatusUpdater.init(conf);
|
||||
((NMContext)context).setContainerManager(containerManager);
|
||||
nodeStatusUpdater.start();
|
||||
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||
containerManager.init(conf);
|
||||
containerManager.start();
|
||||
|
||||
|
@ -242,8 +242,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
firstContainerID, InetAddress.getByName("localhost")
|
||||
.getCanonicalHostName(), 1234, user, resource,
|
||||
currentTime + 10000, 123, "password".getBytes(), currentTime));
|
||||
Context context = mock(Context.class);
|
||||
when(context.getNMStateStore()).thenReturn(stateStore);
|
||||
Container container = new ContainerImpl(conf, mockDispatcher,
|
||||
stateStore, launchContext, null, mockMetrics, containerToken);
|
||||
launchContext, null, mockMetrics, containerToken, context);
|
||||
this.context.getContainers().put(firstContainerID, container);
|
||||
} else if (heartBeatID == 2) {
|
||||
// Checks on the RM end
|
||||
@ -272,8 +274,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
secondContainerID, InetAddress.getByName("localhost")
|
||||
.getCanonicalHostName(), 1234, user, resource,
|
||||
currentTime + 10000, 123, "password".getBytes(), currentTime));
|
||||
Context context = mock(Context.class);
|
||||
when(context.getNMStateStore()).thenReturn(stateStore);
|
||||
Container container = new ContainerImpl(conf, mockDispatcher,
|
||||
stateStore, launchContext, null, mockMetrics, containerToken);
|
||||
launchContext, null, mockMetrics, containerToken, context);
|
||||
this.context.getContainers().put(secondContainerID, container);
|
||||
} else if (heartBeatID == 3) {
|
||||
// Checks on the RM end
|
||||
@ -889,8 +893,9 @@ public void testRemovePreviousCompletedContainersFromContext() throws Exception
|
||||
BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container anyCompletedContainer = new ContainerImpl(conf, null,
|
||||
null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken),
|
||||
nm.getNMContext()) {
|
||||
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
@ -910,8 +915,9 @@ public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||
1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container runningContainer =
|
||||
new ContainerImpl(conf, null, null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
|
||||
new ContainerImpl(conf, null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(runningContainerToken),
|
||||
nm.getNMContext()) {
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
return ContainerState.RUNNING;
|
||||
@ -968,8 +974,9 @@ public void testCompletedContainersIsRecentlyStopped() throws Exception {
|
||||
BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container completedContainer = new ContainerImpl(conf, null,
|
||||
null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken),
|
||||
nm.getNMContext()) {
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
return ContainerState.COMPLETE;
|
||||
@ -1005,8 +1012,9 @@ public void testCleanedupApplicationContainerCleanup() throws IOException {
|
||||
BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container anyCompletedContainer = new ContainerImpl(conf, null,
|
||||
null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken),
|
||||
nm.getNMContext()) {
|
||||
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
|
@ -182,6 +182,7 @@ public void setup() throws IOException {
|
||||
nodeStatusUpdater.init(conf);
|
||||
containerManager.init(conf);
|
||||
nodeStatusUpdater.start();
|
||||
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||
}
|
||||
|
||||
protected ContainerManagerImpl
|
||||
|
@ -25,6 +25,7 @@
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@ -53,6 +54,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.junit.Assert;
|
||||
@ -191,8 +193,9 @@ public void testAuxEventDispatch() {
|
||||
ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
|
||||
ContainerId.newContainerId(attemptId, 1), "", "",
|
||||
Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
|
||||
Container container = new ContainerImpl(null, null, null, null, null,
|
||||
null, cti);
|
||||
Context context = mock(Context.class);
|
||||
Container container = new ContainerImpl(null, null, null, null,
|
||||
null, cti, context);
|
||||
ContainerId containerId = container.getContainerId();
|
||||
Resource resource = container.getResource();
|
||||
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);
|
||||
|
@ -88,6 +88,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
@ -261,6 +262,7 @@ public void testInitWhileDone() throws Exception {
|
||||
wc.containerSuccessful();
|
||||
wc.containerResourcesCleanup();
|
||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
verifyOutofBandHeartBeat(wc);
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
// Now in DONE, issue INIT
|
||||
wc.initContainer();
|
||||
@ -290,6 +292,7 @@ public void testLocalizationFailureAtDone() throws Exception {
|
||||
wc.containerSuccessful();
|
||||
wc.containerResourcesCleanup();
|
||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
verifyOutofBandHeartBeat(wc);
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
// Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
|
||||
wc.resourceFailedContainer();
|
||||
@ -336,6 +339,7 @@ public void testKillOnNew() throws Exception {
|
||||
int killed = metrics.getKilledContainers();
|
||||
wc.killContainer();
|
||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
verifyOutofBandHeartBeat(wc);
|
||||
assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
||||
wc.c.cloneAndGetContainerStatus().getExitStatus());
|
||||
assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
|
||||
@ -596,6 +600,10 @@ private void verifyCleanupCall(WrappedContainer wc) throws Exception {
|
||||
verify(wc.localizerBus).handle(argThat(matchesReq));
|
||||
}
|
||||
|
||||
private void verifyOutofBandHeartBeat(WrappedContainer wc) {
|
||||
verify(wc.context.getNodeStatusUpdater()).sendOutofBandHeartBeat();
|
||||
}
|
||||
|
||||
private static class ResourcesReleasedMatcher extends
|
||||
ArgumentMatcher<LocalizationEvent> {
|
||||
final HashSet<LocalResourceRequest> resources =
|
||||
@ -722,6 +730,7 @@ private class WrappedContainer {
|
||||
final Container c;
|
||||
final Map<String, LocalResource> localResources;
|
||||
final Map<String, ByteBuffer> serviceData;
|
||||
final Context context = mock(Context.class);
|
||||
|
||||
WrappedContainer(int appId, long timestamp, int id, String user)
|
||||
throws IOException {
|
||||
@ -747,11 +756,12 @@ private class WrappedContainer {
|
||||
dispatcher.register(ApplicationEventType.class, appBus);
|
||||
dispatcher.register(LogHandlerEventType.class, LogBus);
|
||||
|
||||
Context context = mock(Context.class);
|
||||
when(context.getApplications()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, Application>());
|
||||
NMNullStateStoreService stateStore = new NMNullStateStoreService();
|
||||
when(context.getNMStateStore()).thenReturn(stateStore);
|
||||
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
|
||||
when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
|
||||
ContainerExecutor executor = mock(ContainerExecutor.class);
|
||||
launcher =
|
||||
new ContainersLauncher(context, dispatcher, executor, null, null);
|
||||
@ -807,8 +817,8 @@ private class WrappedContainer {
|
||||
}
|
||||
when(ctxt.getServiceData()).thenReturn(serviceData);
|
||||
|
||||
c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
|
||||
ctxt, null, metrics, identifier);
|
||||
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
|
||||
context);
|
||||
dispatcher.register(ContainerEventType.class,
|
||||
new EventHandler<ContainerEvent>() {
|
||||
@Override
|
||||
|
@ -208,10 +208,11 @@ public boolean isPmemCheckEnabled() {
|
||||
BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user,
|
||||
BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
|
||||
"password".getBytes(), currentTime);
|
||||
Context context = mock(Context.class);
|
||||
Container container =
|
||||
new ContainerImpl(conf, dispatcher, stateStore, launchContext,
|
||||
new ContainerImpl(conf, dispatcher, launchContext,
|
||||
null, metrics,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken), context) {
|
||||
|
||||
@Override
|
||||
public ContainerState getContainerState() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user