YARN-8330. Improved publishing ALLOCATED events to ATS.

Contributed by Suma Shivaprasad

(cherry picked from commit f93ecf5c1e)
This commit is contained in:
Eric Yang 2018-07-25 18:49:30 -04:00
parent 964f3454d1
commit 8e3807afe0
2 changed files with 43 additions and 32 deletions

View File

@ -244,23 +244,13 @@ public class RMContainerImpl implements RMContainer {
this.readLock = lock.readLock(); this.readLock = lock.readLock();
this.writeLock = lock.writeLock(); this.writeLock = lock.writeLock();
saveNonAMContainerMetaInfo = rmContext.getYarnConfiguration().getBoolean( saveNonAMContainerMetaInfo =
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, shouldPublishNonAMContainerEventstoATS(rmContext);
YarnConfiguration
.DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
if (container.getId() != null) { if (container.getId() != null) {
rmContext.getRMApplicationHistoryWriter().containerStarted(this); rmContext.getRMApplicationHistoryWriter().containerStarted(this);
} }
// If saveNonAMContainerMetaInfo is true, store system metrics for all
// containers. If false, and if this container is marked as the AM, metrics
// will still be published for this container, but that calculation happens
// later.
if (saveNonAMContainerMetaInfo && null != container.getId()) {
rmContext.getSystemMetricsPublisher().containerCreated(
this, this.creationTime);
}
if (this.container != null) { if (this.container != null) {
this.allocationTags = this.container.getAllocationTags(); this.allocationTags = this.container.getAllocationTags();
} }
@ -590,8 +580,12 @@ public class RMContainerImpl implements RMContainer {
container.getNodeId(), container.getContainerId(), container.getNodeId(), container.getContainerId(),
container.getAllocationTags()); container.getAllocationTags());
container.eventHandler.handle(new RMAppAttemptEvent( container.eventHandler.handle(
container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED)); new RMAppAttemptEvent(container.appAttemptId,
RMAppAttemptEventType.CONTAINER_ALLOCATED));
publishNonAMContainerEventstoATS(container);
} }
} }
@ -610,6 +604,8 @@ public class RMContainerImpl implements RMContainer {
// Tell the app // Tell the app
container.eventHandler.handle(new RMAppRunningOnNodeEvent(container container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
.getApplicationAttemptId().getApplicationId(), container.nodeId)); .getApplicationAttemptId().getApplicationId(), container.nodeId));
publishNonAMContainerEventstoATS(container);
} }
} }
@ -718,17 +714,12 @@ public class RMContainerImpl implements RMContainer {
container); container);
boolean saveNonAMContainerMetaInfo = boolean saveNonAMContainerMetaInfo =
container.rmContext.getYarnConfiguration().getBoolean( shouldPublishNonAMContainerEventstoATS(container.rmContext);
YarnConfiguration
.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
YarnConfiguration
.DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
if (saveNonAMContainerMetaInfo || container.isAMContainer()) { if (saveNonAMContainerMetaInfo || container.isAMContainer()) {
container.rmContext.getSystemMetricsPublisher().containerFinished( container.rmContext.getSystemMetricsPublisher().containerFinished(
container, container.finishTime); container, container.finishTime);
} }
} }
private static void updateAttemptMetrics(RMContainerImpl container) { private static void updateAttemptMetrics(RMContainerImpl container) {
@ -754,6 +745,29 @@ public class RMContainerImpl implements RMContainer {
} }
} }
private static boolean shouldPublishNonAMContainerEventstoATS(
RMContext rmContext) {
return rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
YarnConfiguration
.DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
}
private static void publishNonAMContainerEventstoATS(
RMContainerImpl rmContainer) {
boolean saveNonAMContainerMetaInfo = shouldPublishNonAMContainerEventstoATS(
rmContainer.rmContext);
// If saveNonAMContainerMetaInfo is true, store system metrics for all
// containers. If false, and if this container is marked as the AM, metrics
// will still be published for this container, but that calculation happens
// later.
if (saveNonAMContainerMetaInfo && null != rmContainer.container.getId()) {
rmContainer.rmContext.getSystemMetricsPublisher().containerCreated(
rmContainer, rmContainer.creationTime);
}
}
private static final class KillTransition extends FinishedTransition { private static final class KillTransition extends FinishedTransition {
@Override @Override
@ -884,13 +898,5 @@ public class RMContainerImpl implements RMContainer {
if (containerId != null) { if (containerId != null) {
rmContext.getRMApplicationHistoryWriter().containerStarted(this); rmContext.getRMApplicationHistoryWriter().containerStarted(this);
} }
// If saveNonAMContainerMetaInfo is true, store system metrics for all
// containers. If false, and if this container is marked as the AM, metrics
// will still be published for this container, but that calculation happens
// later.
if (saveNonAMContainerMetaInfo && null != container.getId()) {
rmContext.getSystemMetricsPublisher().containerCreated(
this, this.creationTime);
}
} }
} }

View File

@ -135,7 +135,6 @@ public class TestRMContainerImpl {
assertEquals(priority, assertEquals(priority,
rmContainer.getAllocatedSchedulerKey().getPriority()); rmContainer.getAllocatedSchedulerKey().getPriority());
verify(writer).containerStarted(any(RMContainer.class)); verify(writer).containerStarted(any(RMContainer.class));
verify(publisher).containerCreated(any(RMContainer.class), anyLong());
rmContainer.handle(new RMContainerEvent(containerId, rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START)); RMContainerEventType.START));
@ -150,6 +149,8 @@ public class TestRMContainerImpl {
RMContainerEventType.LAUNCHED)); RMContainerEventType.LAUNCHED));
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState()); assertEquals(RMContainerState.RUNNING, rmContainer.getState());
verify(publisher, times(2)).containerCreated(any(RMContainer.class),
anyLong());
assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user", assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
rmContainer.getLogURL()); rmContainer.getLogURL());
@ -240,22 +241,25 @@ public class TestRMContainerImpl {
assertEquals(priority, assertEquals(priority,
rmContainer.getAllocatedSchedulerKey().getPriority()); rmContainer.getAllocatedSchedulerKey().getPriority());
verify(writer).containerStarted(any(RMContainer.class)); verify(writer).containerStarted(any(RMContainer.class));
verify(publisher).containerCreated(any(RMContainer.class), anyLong());
rmContainer.handle(new RMContainerEvent(containerId, rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START)); RMContainerEventType.START));
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
verify(publisher).containerCreated(any(RMContainer.class), anyLong());
rmContainer.handle(new RMContainerEvent(containerId, rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.ACQUIRED)); RMContainerEventType.ACQUIRED));
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
verify(publisher, times(2)).containerCreated(any(RMContainer.class),
anyLong());
rmContainer.handle(new RMContainerEvent(containerId, rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED)); RMContainerEventType.LAUNCHED));
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState()); assertEquals(RMContainerState.RUNNING, rmContainer.getState());
assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user", assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
rmContainer.getLogURL()); rmContainer.getLogURL());
@ -340,7 +344,8 @@ public class TestRMContainerImpl {
// RMContainer should be publishing system metrics for all containers. // RMContainer should be publishing system metrics for all containers.
// Since there is 1 AM container and 1 non-AM container, there should be 2 // Since there is 1 AM container and 1 non-AM container, there should be 2
// container created events and 2 container finished events. // container created events and 2 container finished events.
verify(publisher, times(2)).containerCreated(any(RMContainer.class), anyLong()); verify(publisher, times(4)).containerCreated(any(RMContainer.class),
anyLong());
verify(publisher, times(2)).containerFinished(any(RMContainer.class), anyLong()); verify(publisher, times(2)).containerFinished(any(RMContainer.class), anyLong());
} }