YARN-2046. Out of band heartbeats are sent only on container kill and possibly too early. Contributed by Ming Ma

(cherry picked from commit d284e187b8)
This commit is contained in:
Jason Lowe 2016-02-23 20:49:09 +00:00
parent 7812cfdd81
commit 6f3f5a8c38
12 changed files with 83 additions and 35 deletions

View File

@ -1310,6 +1310,9 @@ Release 2.7.3 - UNRELEASED
YARN-4707. Remove the extra char (>) from SecureContainer.md.
(Brahma Reddy Battula via aajisaka)
YARN-2046. Out of band heartbeats are sent only on container kill and
possibly too early (Ming Ma via jlowe)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES
@ -2167,6 +2170,9 @@ Release 2.6.5 - UNRELEASED
BUG FIXES
YARN-2046. Out of band heartbeats are sent only on container kill and
possibly too early (Ming Ma via jlowe)
Release 2.6.4 - 2016-02-11
INCOMPATIBLE CHANGES

View File

@ -87,4 +87,6 @@ public interface Context {
ConcurrentLinkedQueue<LogAggregationReport>
getLogAggregationStatusForApps();
NodeStatusUpdater getNodeStatusUpdater();
}

View File

@ -352,6 +352,7 @@ public class NodeManager extends CompositeService
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
super.serviceInit(conf);
// TODO add local dirs to del
@ -461,6 +462,7 @@ public class NodeManager extends CompositeService
private boolean isDecommissioned = false;
private final ConcurrentLinkedQueue<LogAggregationReport>
logAggregationReportForApps;
private NodeStatusUpdater nodeStatusUpdater;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
@ -588,6 +590,14 @@ public class NodeManager extends CompositeService
getLogAggregationStatusForApps() {
return this.logAggregationReportForApps;
}
public NodeStatusUpdater getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
this.nodeStatusUpdater = nodeStatusUpdater;
}
}

View File

@ -347,9 +347,9 @@ public class ContainerManagerImpl extends CompositeService implements
Credentials credentials =
YarnServerSecurityUtils.parseCredentials(launchContext);
Container container = new ContainerImpl(getConfig(), dispatcher,
context.getNMStateStore(), req.getContainerLaunchContext(),
req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), context);
context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
@ -907,8 +907,8 @@ public class ContainerManagerImpl extends CompositeService implements
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
context.getNMStateStore(), launchContext,
credentials, metrics, containerTokenIdentifier);
launchContext, credentials, metrics, containerTokenIdentifier,
context);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerId, container) != null) {
@ -1179,10 +1179,6 @@ public class ContainerManagerImpl extends CompositeService implements
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
.getApplicationAttemptId().getApplicationId(), containerID);
// TODO: Move this code to appropriate place once kill_container is
// implemented.
nodeStatusUpdater.sendOutofBandHeartBeat();
}
}

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.shar
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
@ -125,14 +126,15 @@ public class ContainerImpl implements Container {
RecoveredContainerStatus.REQUESTED;
// whether container was marked as killed after recovery
private boolean recoveredAsKilled = false;
private Context context;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
NMStateStoreService stateStore, ContainerLaunchContext launchContext,
Credentials creds, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier) {
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context) {
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.stateStore = stateStore;
this.stateStore = context.getNMStateStore();
this.launchContext = launchContext;
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
@ -144,19 +146,21 @@ public class ContainerImpl implements Container {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.context = context;
stateMachine = stateMachineFactory.make(this);
}
// constructor for a recovered container
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
NMStateStoreService stateStore, ContainerLaunchContext launchContext,
Credentials creds, NodeManagerMetrics metrics,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier,
RecoveredContainerStatus recoveredStatus, int exitCode,
String diagnostics, boolean wasKilled, Resource recoveredCapability) {
this(conf, dispatcher, stateStore, launchContext, creds, metrics,
containerTokenIdentifier);
String diagnostics, boolean wasKilled, Resource recoveredCapability,
Context context) {
this(conf, dispatcher, launchContext, creds, metrics,
containerTokenIdentifier, context);
this.recoveredStatus = recoveredStatus;
this.exitCode = exitCode;
this.recoveredAsKilled = wasKilled;
@ -988,11 +992,12 @@ public class ContainerImpl implements Container {
container.sendFinishedEvents();
//if the current state is NEW it means the CONTAINER_INIT was never
// sent for the event, thus no need to send the CONTAINER_STOP
if (container.getCurrentState()
if (container.getCurrentState()
!= org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_STOP, container));
}
container.context.getNodeStatusUpdater().sendOutofBandHeartBeat();
}
}

View File

@ -131,6 +131,7 @@ public class TestEventFlow {
nodeStatusUpdater.init(conf);
((NMContext)context).setContainerManager(containerManager);
nodeStatusUpdater.start();
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
containerManager.init(conf);
containerManager.start();

View File

@ -252,8 +252,10 @@ public class TestNodeStatusUpdater {
firstContainerID, InetAddress.getByName("localhost")
.getCanonicalHostName(), 1234, user, resource,
currentTime + 10000, 123, "password".getBytes(), currentTime));
Context context = mock(Context.class);
when(context.getNMStateStore()).thenReturn(stateStore);
Container container = new ContainerImpl(conf, mockDispatcher,
stateStore, launchContext, null, mockMetrics, containerToken);
launchContext, null, mockMetrics, containerToken, context);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@ -291,8 +293,10 @@ public class TestNodeStatusUpdater {
secondContainerID, InetAddress.getByName("localhost")
.getCanonicalHostName(), 1234, user, resource,
currentTime + 10000, 123, "password".getBytes(), currentTime));
Context context = mock(Context.class);
when(context.getNMStateStore()).thenReturn(stateStore);
Container container = new ContainerImpl(conf, mockDispatcher,
stateStore, launchContext, null, mockMetrics, containerToken);
launchContext, null, mockMetrics, containerToken, context);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@ -1007,8 +1011,9 @@ public class TestNodeStatusUpdater {
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
Container anyCompletedContainer = new ContainerImpl(conf, null,
null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken),
nm.getNMContext()) {
@Override
public ContainerState getCurrentState() {
@ -1028,8 +1033,9 @@ public class TestNodeStatusUpdater {
1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
Container runningContainer =
new ContainerImpl(conf, null, null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
new ContainerImpl(conf, null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(runningContainerToken),
nm.getNMContext()) {
@Override
public ContainerState getCurrentState() {
return ContainerState.RUNNING;
@ -1086,8 +1092,9 @@ public class TestNodeStatusUpdater {
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
Container completedContainer = new ContainerImpl(conf, null,
null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken),
nm.getNMContext()) {
@Override
public ContainerState getCurrentState() {
return ContainerState.COMPLETE;
@ -1123,8 +1130,9 @@ public class TestNodeStatusUpdater {
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
Container anyCompletedContainer = new ContainerImpl(conf, null,
null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken),
nm.getNMContext()) {
@Override
public ContainerState getCurrentState() {

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -678,5 +679,9 @@ public abstract class BaseAMRMProxyTest {
return null;
}
@Override
public NodeStatusUpdater getNodeStatusUpdater() {
return null;
}
}
}

View File

@ -188,6 +188,7 @@ public abstract class BaseContainerManagerTest {
nodeStatusUpdater.init(conf);
containerManager.init(conf);
nodeStatusUpdater.start();
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
}
protected ContainerManagerImpl

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.junit.Assert;
@ -192,8 +194,9 @@ public class TestAuxServices {
ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
ContainerId.newContainerId(attemptId, 1), "", "",
Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
Container container = new ContainerImpl(null, null, null, null, null,
null, cti);
Context context = mock(Context.class);
Container container = new ContainerImpl(null, null, null, null,
null, cti, context);
ContainerId containerId = container.getContainerId();
Resource resource = container.getResource();
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
@ -261,6 +262,7 @@ public class TestContainer {
wc.containerSuccessful();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
verifyOutofBandHeartBeat(wc);
assertNull(wc.c.getLocalizedResources());
// Now in DONE, issue INIT
wc.initContainer();
@ -290,6 +292,7 @@ public class TestContainer {
wc.containerSuccessful();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
verifyOutofBandHeartBeat(wc);
assertNull(wc.c.getLocalizedResources());
// Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
wc.resourceFailedContainer();
@ -336,6 +339,7 @@ public class TestContainer {
int killed = metrics.getKilledContainers();
wc.killContainer();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
verifyOutofBandHeartBeat(wc);
assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
wc.c.cloneAndGetContainerStatus().getExitStatus());
assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
@ -653,6 +657,10 @@ public class TestContainer {
verify(wc.localizerBus).handle(argThat(matchesReq));
}
private void verifyOutofBandHeartBeat(WrappedContainer wc) {
verify(wc.context.getNodeStatusUpdater()).sendOutofBandHeartBeat();
}
private static class ResourcesReleasedMatcher extends
ArgumentMatcher<LocalizationEvent> {
final HashSet<LocalResourceRequest> resources =
@ -779,6 +787,7 @@ public class TestContainer {
final Container c;
final Map<String, LocalResource> localResources;
final Map<String, ByteBuffer> serviceData;
final Context context = mock(Context.class);
WrappedContainer(int appId, long timestamp, int id, String user)
throws IOException {
@ -804,11 +813,12 @@ public class TestContainer {
dispatcher.register(ApplicationEventType.class, appBus);
dispatcher.register(LogHandlerEventType.class, LogBus);
Context context = mock(Context.class);
when(context.getApplications()).thenReturn(
new ConcurrentHashMap<ApplicationId, Application>());
NMNullStateStoreService stateStore = new NMNullStateStoreService();
when(context.getNMStateStore()).thenReturn(stateStore);
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
ContainerExecutor executor = mock(ContainerExecutor.class);
launcher =
new ContainersLauncher(context, dispatcher, executor, null, null);
@ -864,8 +874,8 @@ public class TestContainer {
}
when(ctxt.getServiceData()).thenReturn(serviceData);
c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
ctxt, null, metrics, identifier);
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
context);
dispatcher.register(ContainerEventType.class,
new EventHandler<ContainerEvent>() {
@Override

View File

@ -215,10 +215,11 @@ public class TestNMWebServer {
BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user,
BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
"password".getBytes(), currentTime);
Context context = mock(Context.class);
Container container =
new ContainerImpl(conf, dispatcher, stateStore, launchContext,
new ContainerImpl(conf, dispatcher, launchContext,
null, metrics,
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
BuilderUtils.newContainerTokenIdentifier(containerToken), context) {
@Override
public ContainerState getContainerState() {