YARN-2046. Out of band heartbeats are sent only on container kill and possibly too early. Contributed by Ming Ma
(cherry picked from commit d284e187b8
)
This commit is contained in:
parent
440379a0d4
commit
acffe82353
|
@ -1483,6 +1483,9 @@ Release 2.7.3 - UNRELEASED
|
|||
YARN-4707. Remove the extra char (>) from SecureContainer.md.
|
||||
(Brahma Reddy Battula via aajisaka)
|
||||
|
||||
YARN-2046. Out of band heartbeats are sent only on container kill and
|
||||
possibly too early (Ming Ma via jlowe)
|
||||
|
||||
Release 2.7.2 - 2016-01-25
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -2340,6 +2343,9 @@ Release 2.6.5 - UNRELEASED
|
|||
|
||||
BUG FIXES
|
||||
|
||||
YARN-2046. Out of band heartbeats are sent only on container kill and
|
||||
possibly too early (Ming Ma via jlowe)
|
||||
|
||||
Release 2.6.4 - 2016-02-11
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -87,4 +87,6 @@ public interface Context {
|
|||
|
||||
ConcurrentLinkedQueue<LogAggregationReport>
|
||||
getLogAggregationStatusForApps();
|
||||
|
||||
NodeStatusUpdater getNodeStatusUpdater();
|
||||
}
|
||||
|
|
|
@ -353,6 +353,7 @@ public class NodeManager extends CompositeService
|
|||
// StatusUpdater should be added last so that it get started last
|
||||
// so that we make sure everything is up before registering with RM.
|
||||
addService(nodeStatusUpdater);
|
||||
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||
|
||||
super.serviceInit(conf);
|
||||
// TODO add local dirs to del
|
||||
|
@ -458,6 +459,7 @@ public class NodeManager extends CompositeService
|
|||
private boolean isDecommissioned = false;
|
||||
private final ConcurrentLinkedQueue<LogAggregationReport>
|
||||
logAggregationReportForApps;
|
||||
private NodeStatusUpdater nodeStatusUpdater;
|
||||
|
||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
|
@ -585,6 +587,14 @@ public class NodeManager extends CompositeService
|
|||
getLogAggregationStatusForApps() {
|
||||
return this.logAggregationReportForApps;
|
||||
}
|
||||
|
||||
public NodeStatusUpdater getNodeStatusUpdater() {
|
||||
return this.nodeStatusUpdater;
|
||||
}
|
||||
|
||||
public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
|
||||
this.nodeStatusUpdater = nodeStatusUpdater;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -361,9 +361,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
Credentials credentials =
|
||||
YarnServerSecurityUtils.parseCredentials(launchContext);
|
||||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||
context.getNMStateStore(), req.getContainerLaunchContext(),
|
||||
req.getContainerLaunchContext(),
|
||||
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
|
||||
rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
|
||||
rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), context);
|
||||
context.getContainers().put(containerId, container);
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationContainerInitEvent(container));
|
||||
|
@ -921,8 +921,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
Container container =
|
||||
new ContainerImpl(getConfig(), this.dispatcher,
|
||||
context.getNMStateStore(), launchContext,
|
||||
credentials, metrics, containerTokenIdentifier);
|
||||
launchContext, credentials, metrics, containerTokenIdentifier,
|
||||
context);
|
||||
ApplicationId applicationID =
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
if (context.getContainers().putIfAbsent(containerId, container) != null) {
|
||||
|
@ -1193,10 +1193,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
NMAuditLogger.logSuccess(container.getUser(),
|
||||
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
|
||||
.getApplicationAttemptId().getApplicationId(), containerID);
|
||||
|
||||
// TODO: Move this code to appropriate place once kill_container is
|
||||
// implemented.
|
||||
nodeStatusUpdater.sendOutofBandHeartBeat();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.shar
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||
|
@ -125,14 +126,15 @@ public class ContainerImpl implements Container {
|
|||
RecoveredContainerStatus.REQUESTED;
|
||||
// whether container was marked as killed after recovery
|
||||
private boolean recoveredAsKilled = false;
|
||||
private Context context;
|
||||
|
||||
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||
NMStateStoreService stateStore, ContainerLaunchContext launchContext,
|
||||
Credentials creds, NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier) {
|
||||
ContainerLaunchContext launchContext, Credentials creds,
|
||||
NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier, Context context) {
|
||||
this.daemonConf = conf;
|
||||
this.dispatcher = dispatcher;
|
||||
this.stateStore = stateStore;
|
||||
this.stateStore = context.getNMStateStore();
|
||||
this.launchContext = launchContext;
|
||||
this.containerTokenIdentifier = containerTokenIdentifier;
|
||||
this.containerId = containerTokenIdentifier.getContainerID();
|
||||
|
@ -144,19 +146,21 @@ public class ContainerImpl implements Container {
|
|||
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
this.readLock = readWriteLock.readLock();
|
||||
this.writeLock = readWriteLock.writeLock();
|
||||
this.context = context;
|
||||
|
||||
stateMachine = stateMachineFactory.make(this);
|
||||
}
|
||||
|
||||
// constructor for a recovered container
|
||||
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||
NMStateStoreService stateStore, ContainerLaunchContext launchContext,
|
||||
Credentials creds, NodeManagerMetrics metrics,
|
||||
ContainerLaunchContext launchContext, Credentials creds,
|
||||
NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier,
|
||||
RecoveredContainerStatus recoveredStatus, int exitCode,
|
||||
String diagnostics, boolean wasKilled, Resource recoveredCapability) {
|
||||
this(conf, dispatcher, stateStore, launchContext, creds, metrics,
|
||||
containerTokenIdentifier);
|
||||
String diagnostics, boolean wasKilled, Resource recoveredCapability,
|
||||
Context context) {
|
||||
this(conf, dispatcher, launchContext, creds, metrics,
|
||||
containerTokenIdentifier, context);
|
||||
this.recoveredStatus = recoveredStatus;
|
||||
this.exitCode = exitCode;
|
||||
this.recoveredAsKilled = wasKilled;
|
||||
|
@ -993,6 +997,7 @@ public class ContainerImpl implements Container {
|
|||
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
|
||||
(AuxServicesEventType.CONTAINER_STOP, container));
|
||||
}
|
||||
container.context.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -131,6 +131,7 @@ public class TestEventFlow {
|
|||
nodeStatusUpdater.init(conf);
|
||||
((NMContext)context).setContainerManager(containerManager);
|
||||
nodeStatusUpdater.start();
|
||||
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||
containerManager.init(conf);
|
||||
containerManager.start();
|
||||
|
||||
|
|
|
@ -254,8 +254,10 @@ public class TestNodeStatusUpdater {
|
|||
firstContainerID, InetAddress.getByName("localhost")
|
||||
.getCanonicalHostName(), 1234, user, resource,
|
||||
currentTime + 10000, 123, "password".getBytes(), currentTime));
|
||||
Context context = mock(Context.class);
|
||||
when(context.getNMStateStore()).thenReturn(stateStore);
|
||||
Container container = new ContainerImpl(conf, mockDispatcher,
|
||||
stateStore, launchContext, null, mockMetrics, containerToken);
|
||||
launchContext, null, mockMetrics, containerToken, context);
|
||||
this.context.getContainers().put(firstContainerID, container);
|
||||
} else if (heartBeatID == 2) {
|
||||
// Checks on the RM end
|
||||
|
@ -293,8 +295,10 @@ public class TestNodeStatusUpdater {
|
|||
secondContainerID, InetAddress.getByName("localhost")
|
||||
.getCanonicalHostName(), 1234, user, resource,
|
||||
currentTime + 10000, 123, "password".getBytes(), currentTime));
|
||||
Context context = mock(Context.class);
|
||||
when(context.getNMStateStore()).thenReturn(stateStore);
|
||||
Container container = new ContainerImpl(conf, mockDispatcher,
|
||||
stateStore, launchContext, null, mockMetrics, containerToken);
|
||||
launchContext, null, mockMetrics, containerToken, context);
|
||||
this.context.getContainers().put(secondContainerID, container);
|
||||
} else if (heartBeatID == 3) {
|
||||
// Checks on the RM end
|
||||
|
@ -1011,8 +1015,9 @@ public class TestNodeStatusUpdater {
|
|||
BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container anyCompletedContainer = new ContainerImpl(conf, null,
|
||||
null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken),
|
||||
nm.getNMContext()) {
|
||||
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
|
@ -1032,8 +1037,9 @@ public class TestNodeStatusUpdater {
|
|||
1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container runningContainer =
|
||||
new ContainerImpl(conf, null, null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
|
||||
new ContainerImpl(conf, null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(runningContainerToken),
|
||||
nm.getNMContext()) {
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
return ContainerState.RUNNING;
|
||||
|
@ -1090,8 +1096,9 @@ public class TestNodeStatusUpdater {
|
|||
BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container completedContainer = new ContainerImpl(conf, null,
|
||||
null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken),
|
||||
nm.getNMContext()) {
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
return ContainerState.COMPLETE;
|
||||
|
@ -1127,8 +1134,9 @@ public class TestNodeStatusUpdater {
|
|||
BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container anyCompletedContainer = new ContainerImpl(conf, null,
|
||||
null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken),
|
||||
nm.getNMContext()) {
|
||||
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
|
@ -678,5 +679,9 @@ public abstract class BaseAMRMProxyTest {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeStatusUpdater getNodeStatusUpdater() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -188,6 +188,7 @@ public abstract class BaseContainerManagerTest {
|
|||
nodeStatusUpdater.init(conf);
|
||||
containerManager.init(conf);
|
||||
nodeStatusUpdater.start();
|
||||
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||
}
|
||||
|
||||
protected ContainerManagerImpl
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
|||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.junit.Assert;
|
||||
|
@ -192,8 +194,9 @@ public class TestAuxServices {
|
|||
ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
|
||||
ContainerId.newContainerId(attemptId, 1), "", "",
|
||||
Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
|
||||
Container container = new ContainerImpl(null, null, null, null, null,
|
||||
null, cti);
|
||||
Context context = mock(Context.class);
|
||||
Container container = new ContainerImpl(null, null, null, null,
|
||||
null, cti, context);
|
||||
ContainerId containerId = container.getContainerId();
|
||||
Resource resource = container.getResource();
|
||||
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);
|
||||
|
|
|
@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
|
@ -261,6 +262,7 @@ public class TestContainer {
|
|||
wc.containerSuccessful();
|
||||
wc.containerResourcesCleanup();
|
||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
verifyOutofBandHeartBeat(wc);
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
// Now in DONE, issue INIT
|
||||
wc.initContainer();
|
||||
|
@ -290,6 +292,7 @@ public class TestContainer {
|
|||
wc.containerSuccessful();
|
||||
wc.containerResourcesCleanup();
|
||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
verifyOutofBandHeartBeat(wc);
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
// Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
|
||||
wc.resourceFailedContainer();
|
||||
|
@ -336,6 +339,7 @@ public class TestContainer {
|
|||
int killed = metrics.getKilledContainers();
|
||||
wc.killContainer();
|
||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
verifyOutofBandHeartBeat(wc);
|
||||
assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
||||
wc.c.cloneAndGetContainerStatus().getExitStatus());
|
||||
assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
|
||||
|
@ -653,6 +657,10 @@ public class TestContainer {
|
|||
verify(wc.localizerBus).handle(argThat(matchesReq));
|
||||
}
|
||||
|
||||
private void verifyOutofBandHeartBeat(WrappedContainer wc) {
|
||||
verify(wc.context.getNodeStatusUpdater()).sendOutofBandHeartBeat();
|
||||
}
|
||||
|
||||
private static class ResourcesReleasedMatcher extends
|
||||
ArgumentMatcher<LocalizationEvent> {
|
||||
final HashSet<LocalResourceRequest> resources =
|
||||
|
@ -779,6 +787,7 @@ public class TestContainer {
|
|||
final Container c;
|
||||
final Map<String, LocalResource> localResources;
|
||||
final Map<String, ByteBuffer> serviceData;
|
||||
final Context context = mock(Context.class);
|
||||
|
||||
WrappedContainer(int appId, long timestamp, int id, String user)
|
||||
throws IOException {
|
||||
|
@ -804,11 +813,12 @@ public class TestContainer {
|
|||
dispatcher.register(ApplicationEventType.class, appBus);
|
||||
dispatcher.register(LogHandlerEventType.class, LogBus);
|
||||
|
||||
Context context = mock(Context.class);
|
||||
when(context.getApplications()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, Application>());
|
||||
NMNullStateStoreService stateStore = new NMNullStateStoreService();
|
||||
when(context.getNMStateStore()).thenReturn(stateStore);
|
||||
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
|
||||
when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
|
||||
ContainerExecutor executor = mock(ContainerExecutor.class);
|
||||
launcher =
|
||||
new ContainersLauncher(context, dispatcher, executor, null, null);
|
||||
|
@ -864,8 +874,8 @@ public class TestContainer {
|
|||
}
|
||||
when(ctxt.getServiceData()).thenReturn(serviceData);
|
||||
|
||||
c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
|
||||
ctxt, null, metrics, identifier);
|
||||
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
|
||||
context);
|
||||
dispatcher.register(ContainerEventType.class,
|
||||
new EventHandler<ContainerEvent>() {
|
||||
@Override
|
||||
|
|
|
@ -215,10 +215,11 @@ public class TestNMWebServer {
|
|||
BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user,
|
||||
BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
|
||||
"password".getBytes(), currentTime);
|
||||
Context context = mock(Context.class);
|
||||
Container container =
|
||||
new ContainerImpl(conf, dispatcher, stateStore, launchContext,
|
||||
new ContainerImpl(conf, dispatcher, launchContext,
|
||||
null, metrics,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken), context) {
|
||||
|
||||
@Override
|
||||
public ContainerState getContainerState() {
|
||||
|
|
Loading…
Reference in New Issue