YARN-3978. Configurably turn off the saving of container info in Generic AHS (Eric Payne via jeagles)
(cherry picked from commit 3cd02b9522
)
This commit is contained in:
parent
c6e79178d4
commit
899df5bce0
|
@ -681,6 +681,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
YARN-3967. Fetch the application report from the AHS if the RM does not know about it.
|
YARN-3967. Fetch the application report from the AHS if the RM does not know about it.
|
||||||
(Mit Desai via xgong)
|
(Mit Desai via xgong)
|
||||||
|
|
||||||
|
YARN-3978. Configurably turn off the saving of container info in Generic AHS
|
||||||
|
(Eric Payne via jeagles)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -1435,6 +1435,15 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String APPLICATION_HISTORY_STORE =
|
public static final String APPLICATION_HISTORY_STORE =
|
||||||
APPLICATION_HISTORY_PREFIX + "store-class";
|
APPLICATION_HISTORY_PREFIX + "store-class";
|
||||||
|
|
||||||
|
/** Save container meta-info in the application history store. */
|
||||||
|
@Private
|
||||||
|
public static final String
|
||||||
|
APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO =
|
||||||
|
APPLICATION_HISTORY_PREFIX + "save-non-am-container-meta-info";
|
||||||
|
@Private
|
||||||
|
public static final boolean
|
||||||
|
DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO = true;
|
||||||
|
|
||||||
/** URI for FileSystemApplicationHistoryStore */
|
/** URI for FileSystemApplicationHistoryStore */
|
||||||
@Private
|
@Private
|
||||||
public static final String FS_APPLICATION_HISTORY_STORE_URI =
|
public static final String FS_APPLICATION_HISTORY_STORE_URI =
|
||||||
|
|
|
@ -266,12 +266,14 @@ public class AppBlock extends HtmlBlock {
|
||||||
@Override
|
@Override
|
||||||
public ContainerReport run() throws Exception {
|
public ContainerReport run() throws Exception {
|
||||||
ContainerReport report = null;
|
ContainerReport report = null;
|
||||||
|
if (request.getContainerId() != null) {
|
||||||
try {
|
try {
|
||||||
report = appBaseProt.getContainerReport(request)
|
report = appBaseProt.getContainerReport(request)
|
||||||
.getContainerReport();
|
.getContainerReport();
|
||||||
} catch (ContainerNotFoundException ex) {
|
} catch (ContainerNotFoundException ex) {
|
||||||
LOG.warn(ex.getMessage());
|
LOG.warn(ex.getMessage());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return report;
|
return report;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -173,6 +174,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
.currentTimeMillis(), "");
|
.currentTimeMillis(), "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean saveNonAMContainerMetaInfo;
|
||||||
|
|
||||||
public RMContainerImpl(Container container,
|
public RMContainerImpl(Container container,
|
||||||
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
|
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
|
||||||
RMContext rmContext, String nodeLabelExpression) {
|
RMContext rmContext, String nodeLabelExpression) {
|
||||||
|
@ -201,10 +204,22 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
this.readLock = lock.readLock();
|
this.readLock = lock.readLock();
|
||||||
this.writeLock = lock.writeLock();
|
this.writeLock = lock.writeLock();
|
||||||
|
|
||||||
|
saveNonAMContainerMetaInfo = rmContext.getYarnConfiguration().getBoolean(
|
||||||
|
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
|
||||||
|
YarnConfiguration
|
||||||
|
.DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
|
||||||
|
|
||||||
rmContext.getRMApplicationHistoryWriter().containerStarted(this);
|
rmContext.getRMApplicationHistoryWriter().containerStarted(this);
|
||||||
|
|
||||||
|
// If saveNonAMContainerMetaInfo is true, store system metrics for all
|
||||||
|
// containers. If false, and if this container is marked as the AM, metrics
|
||||||
|
// will still be published for this container, but that calculation happens
|
||||||
|
// later.
|
||||||
|
if (saveNonAMContainerMetaInfo) {
|
||||||
rmContext.getSystemMetricsPublisher().containerCreated(
|
rmContext.getSystemMetricsPublisher().containerCreated(
|
||||||
this, this.creationTime);
|
this, this.creationTime);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerId getContainerId() {
|
public ContainerId getContainerId() {
|
||||||
|
@ -376,6 +391,15 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Even if saveNonAMContainerMetaInfo is not true, the AM container's system
|
||||||
|
// metrics still need to be saved so that the AM's logs can be accessed.
|
||||||
|
// This call to getSystemMetricsPublisher().containerCreated() is mutually
|
||||||
|
// exclusive with the one in the RMContainerImpl constructor.
|
||||||
|
if (!saveNonAMContainerMetaInfo && this.isAMContainer) {
|
||||||
|
rmContext.getSystemMetricsPublisher().containerCreated(
|
||||||
|
this, this.creationTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -516,10 +540,21 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
|
|
||||||
container.rmContext.getRMApplicationHistoryWriter().containerFinished(
|
container.rmContext.getRMApplicationHistoryWriter().containerFinished(
|
||||||
container);
|
container);
|
||||||
|
|
||||||
|
boolean saveNonAMContainerMetaInfo =
|
||||||
|
container.rmContext.getYarnConfiguration().getBoolean(
|
||||||
|
YarnConfiguration
|
||||||
|
.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
|
||||||
|
YarnConfiguration
|
||||||
|
.DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
|
||||||
|
|
||||||
|
if (saveNonAMContainerMetaInfo || container.isAMContainer()) {
|
||||||
container.rmContext.getSystemMetricsPublisher().containerFinished(
|
container.rmContext.getSystemMetricsPublisher().containerFinished(
|
||||||
container, container.finishTime);
|
container, container.finishTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private static void updateAttemptMetrics(RMContainerImpl container) {
|
private static void updateAttemptMetrics(RMContainerImpl container) {
|
||||||
// If this is a preempted container, update preemption metrics
|
// If this is a preempted container, update preemption metrics
|
||||||
Resource resource = container.getContainer().getResource();
|
Resource resource = container.getContainer().getResource();
|
||||||
|
|
|
@ -1172,6 +1172,7 @@ public class TestClientRMService {
|
||||||
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
||||||
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
|
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
|
||||||
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
|
||||||
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
|
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
|
||||||
yarnScheduler);
|
yarnScheduler);
|
||||||
when(rmContext.getRMApps()).thenReturn(apps);
|
when(rmContext.getRMApps()).thenReturn(apps);
|
||||||
|
|
|
@ -19,12 +19,14 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -270,4 +272,77 @@ public class TestRMContainerImpl {
|
||||||
Assert.assertNull(scheduler.getRMContainer(containerId2)
|
Assert.assertNull(scheduler.getRMContainer(containerId2)
|
||||||
.getResourceRequests());
|
.getResourceRequests());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 180000)
|
||||||
|
public void testStoreAllContainerMetrics() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
|
||||||
|
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
|
||||||
|
rm1.getRMContext().setSystemMetricsPublisher(publisher);
|
||||||
|
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
|
||||||
|
RMApp app1 = rm1.submitApp(1024);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
|
||||||
|
|
||||||
|
// request a container.
|
||||||
|
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 = ContainerId.newContainerId(
|
||||||
|
am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
|
||||||
|
.getAllocatedContainers();
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED);
|
||||||
|
rm1.stop();
|
||||||
|
|
||||||
|
// RMContainer should be publishing system metrics for all containers.
|
||||||
|
// Since there is 1 AM container and 1 non-AM container, there should be 2
|
||||||
|
// container created events and 2 container finished events.
|
||||||
|
verify(publisher, times(2)).containerCreated(any(RMContainer.class), anyLong());
|
||||||
|
verify(publisher, times(2)).containerFinished(any(RMContainer.class), anyLong());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 180000)
|
||||||
|
public void testStoreOnlyAMContainerMetrics() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
|
||||||
|
false);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
|
||||||
|
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
|
||||||
|
rm1.getRMContext().setSystemMetricsPublisher(publisher);
|
||||||
|
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
|
||||||
|
RMApp app1 = rm1.submitApp(1024);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
|
||||||
|
|
||||||
|
// request a container.
|
||||||
|
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 = ContainerId.newContainerId(
|
||||||
|
am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
|
||||||
|
.getAllocatedContainers();
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE);
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED);
|
||||||
|
rm1.stop();
|
||||||
|
|
||||||
|
// RMContainer should be publishing system metrics only for AM container.
|
||||||
|
verify(publisher, times(1)).containerCreated(any(RMContainer.class), anyLong());
|
||||||
|
verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,6 +270,7 @@ public class TestChildQueueOrder {
|
||||||
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
||||||
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
||||||
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
|
||||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
app_0.getApplicationId(), 1);
|
app_0.getApplicationId(), 1);
|
||||||
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
||||||
|
|
|
@ -168,6 +168,8 @@ public class TestLeafQueue {
|
||||||
cs.start();
|
cs.start();
|
||||||
|
|
||||||
when(spyRMContext.getScheduler()).thenReturn(cs);
|
when(spyRMContext.getScheduler()).thenReturn(cs);
|
||||||
|
when(spyRMContext.getYarnConfiguration())
|
||||||
|
.thenReturn(new YarnConfiguration());
|
||||||
when(cs.getNumClusterNodes()).thenReturn(3);
|
when(cs.getNumClusterNodes()).thenReturn(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -129,6 +129,8 @@ public class TestReservations {
|
||||||
|
|
||||||
spyRMContext = spy(rmContext);
|
spyRMContext = spy(rmContext);
|
||||||
when(spyRMContext.getScheduler()).thenReturn(cs);
|
when(spyRMContext.getScheduler()).thenReturn(cs);
|
||||||
|
when(spyRMContext.getYarnConfiguration())
|
||||||
|
.thenReturn(new YarnConfiguration());
|
||||||
|
|
||||||
cs.setRMContext(spyRMContext);
|
cs.setRMContext(spyRMContext);
|
||||||
cs.init(csConf);
|
cs.init(csConf);
|
||||||
|
@ -642,6 +644,7 @@ public class TestReservations {
|
||||||
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
||||||
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
||||||
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
|
||||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
app_0.getApplicationId(), 1);
|
app_0.getApplicationId(), 1);
|
||||||
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
||||||
|
@ -711,6 +714,7 @@ public class TestReservations {
|
||||||
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
||||||
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
||||||
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
|
||||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
app_0.getApplicationId(), 1);
|
app_0.getApplicationId(), 1);
|
||||||
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
||||||
|
|
|
@ -222,6 +222,7 @@ public class TestFifoScheduler {
|
||||||
scheduler);
|
scheduler);
|
||||||
((RMContextImpl) rmContext).setSystemMetricsPublisher(
|
((RMContextImpl) rmContext).setSystemMetricsPublisher(
|
||||||
mock(SystemMetricsPublisher.class));
|
mock(SystemMetricsPublisher.class));
|
||||||
|
((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration());
|
||||||
|
|
||||||
scheduler.setRMContext(rmContext);
|
scheduler.setRMContext(rmContext);
|
||||||
scheduler.init(conf);
|
scheduler.init(conf);
|
||||||
|
@ -303,6 +304,7 @@ public class TestFifoScheduler {
|
||||||
scheduler);
|
scheduler);
|
||||||
((RMContextImpl) rmContext).setSystemMetricsPublisher(
|
((RMContextImpl) rmContext).setSystemMetricsPublisher(
|
||||||
mock(SystemMetricsPublisher.class));
|
mock(SystemMetricsPublisher.class));
|
||||||
|
((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration());
|
||||||
NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
|
NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
|
||||||
nlm.init(new Configuration());
|
nlm.init(new Configuration());
|
||||||
rmContext.setNodeLabelManager(nlm);
|
rmContext.setNodeLabelManager(nlm);
|
||||||
|
|
Loading…
Reference in New Issue