YARN-10200. Add number of containers to RMAppManager summary

(cherry picked from commit 2de0572cdc1c6fdbfaab108b169b2d5b0c077e86)
(cherry picked from commit 5d3fb0ebe9)
(cherry picked from commit 9c6dd8c83a)
This commit is contained in:
Jonathan Hung 2020-03-23 14:35:43 -07:00
parent 2c321c3c30
commit 1c8529f030
19 changed files with 85 additions and 21 deletions

View File

@ -209,7 +209,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
? ""
: app.getApplicationSubmissionContext()
.getNodeLabelExpression())
.add("diagnostics", app.getDiagnostics());
.add("diagnostics", app.getDiagnostics())
.add("totalAllocatedContainers",
metrics.getTotalAllocatedContainers());
return summary;
}

View File

@ -861,7 +861,8 @@ public abstract class RMStateStore extends AbstractService {
appAttempt.getMasterContainer(),
credentials, appAttempt.getStartTime(),
resUsage.getResourceUsageSecondsMap(),
attempMetrics.getPreemptedResourceSecondsMap());
attempMetrics.getPreemptedResourceSecondsMap(),
attempMetrics.getTotalAllocatedContainers());
getRMStateStoreEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));

View File

@ -46,7 +46,8 @@ public abstract class ApplicationAttemptStateData {
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long finishTime, Map<String, Long> resourceSecondsMap,
Map<String, Long> preemptedResourceSecondsMap) {
Map<String, Long> preemptedResourceSecondsMap,
int totalAllocatedContainers) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@ -74,6 +75,7 @@ public abstract class ApplicationAttemptStateData {
attemptStateData.setResourceSecondsMap(resourceSecondsMap);
attemptStateData
.setPreemptedResourceSecondsMap(preemptedResourceSecondsMap);
attemptStateData.setTotalAllocatedContainers(totalAllocatedContainers);
return attemptStateData;
}
@ -81,10 +83,12 @@ public abstract class ApplicationAttemptStateData {
ApplicationAttemptId attemptId, Container masterContainer,
Credentials attemptTokens, long startTime,
Map<String, Long> resourceSeondsMap,
Map<String, Long> preemptedResourceSecondsMap) {
Map<String, Long> preemptedResourceSecondsMap,
int totalAllocatedContainers) {
return newInstance(attemptId, masterContainer, attemptTokens, startTime,
null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
resourceSeondsMap, preemptedResourceSecondsMap);
resourceSeondsMap, preemptedResourceSecondsMap,
totalAllocatedContainers);
}
@ -276,4 +280,24 @@ public abstract class ApplicationAttemptStateData {
@Unstable
public abstract void setPreemptedResourceSecondsMap(
Map<String, Long> preemptedResourceSecondsMap);
/**
* Get total number of containers allocated for this attempt.
*
* @return total number of containers allocated for this attempt.
*/
@Public
@Unstable
public abstract int getTotalAllocatedContainers();
/**
* Set total number of containers allocated for this attempt.
*
* @param totalAllocatedContainers total number of containers
*/
@Public
@Unstable
public abstract void setTotalAllocatedContainers(
int totalAllocatedContainers);
}

View File

@ -454,4 +454,16 @@ public class ApplicationAttemptStateDataPBImpl extends
.convertMapToStringLongMapProtoList(preemptedResourceSecondsMap));
}
}
@Override
public int getTotalAllocatedContainers() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getTotalAllocatedContainers();
}
@Override
public void setTotalAllocatedContainers(int totalAllocatedContainers) {
maybeInitBuilder();
builder.setTotalAllocatedContainers(totalAllocatedContainers);
}
}

View File

@ -1654,7 +1654,7 @@ public class RMAppImpl implements RMApp, Recoverable {
int numNonAMContainerPreempted = 0;
Map<String, Long> resourceSecondsMap = new HashMap<>();
Map<String, Long> preemptedSecondsMap = new HashMap<>();
int totalAllocatedContainers = 0;
this.readLock.lock();
try {
for (RMAppAttempt attempt : attempts.values()) {
@ -1684,6 +1684,8 @@ public class RMAppImpl implements RMApp, Recoverable {
value += entry.getValue();
preemptedSecondsMap.put(entry.getKey(), value);
}
totalAllocatedContainers +=
attemptMetrics.getTotalAllocatedContainers();
}
}
} finally {
@ -1691,7 +1693,8 @@ public class RMAppImpl implements RMApp, Recoverable {
}
return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted,
numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap);
numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap,
totalAllocatedContainers);
}
@Private

View File

@ -30,16 +30,19 @@ public class RMAppMetrics {
final int numAMContainersPreempted;
private final Map<String, Long> resourceSecondsMap;
private final Map<String, Long> preemptedResourceSecondsMap;
private int totalAllocatedContainers;
public RMAppMetrics(Resource resourcePreempted,
int numNonAMContainersPreempted, int numAMContainersPreempted,
Map<String, Long> resourceSecondsMap,
Map<String, Long> preemptedResourceSecondsMap) {
Map<String, Long> preemptedResourceSecondsMap,
int totalAllocatedContainers) {
this.resourcePreempted = resourcePreempted;
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
this.numAMContainersPreempted = numAMContainersPreempted;
this.resourceSecondsMap = resourceSecondsMap;
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
this.totalAllocatedContainers = totalAllocatedContainers;
}
public Resource getResourcePreempted() {
@ -83,4 +86,7 @@ public class RMAppMetrics {
return preemptedResourceSecondsMap;
}
public int getTotalAllocatedContainers() {
return totalAllocatedContainers;
}
}

View File

@ -982,6 +982,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.updateAggregateAppResourceUsage(attemptState.getResourceSecondsMap());
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
attemptState.getPreemptedResourceSecondsMap());
this.attemptMetrics.setTotalAllocatedContainers(
attemptState.getTotalAllocatedContainers());
}
public void transferStateFromAttempt(RMAppAttempt attempt) {
@ -1362,7 +1364,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags.toString(), finalStatus, exitStatus,
getFinishTime(), resUsage.getResourceUsageSecondsMap(),
this.attemptMetrics.getPreemptedResourceSecondsMap());
this.attemptMetrics.getPreemptedResourceSecondsMap(),
this.attemptMetrics.getTotalAllocatedContainers());
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
@ -1824,7 +1827,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ContainerExitStatus.INVALID, appAttempt.getFinishTime(),
appAttempt.attemptMetrics.getAggregateAppResourceUsage()
.getResourceUsageSecondsMap(),
appAttempt.attemptMetrics.getPreemptedResourceSecondsMap());
appAttempt.attemptMetrics.getPreemptedResourceSecondsMap(),
appAttempt.attemptMetrics.getTotalAllocatedContainers());
appAttempt.rmContext.getStateStore()
.updateApplicationAttemptState(attemptState);
}

View File

@ -229,6 +229,10 @@ public class RMAppAttemptMetrics {
return this.totalAllocatedContainers;
}
public void setTotalAllocatedContainers(int totalAllocatedContainers) {
this.totalAllocatedContainers = totalAllocatedContainers;
}
public Resource getApplicationAttemptHeadroom() {
return Resource.newInstance(applicationHeadroom);
}

View File

@ -90,6 +90,7 @@ message ApplicationAttemptStateDataProto {
optional int64 preempted_vcore_seconds = 14;
repeated StringLongMapProto application_resource_usage_map = 15;
repeated StringLongMapProto preempted_resource_usage_map = 16;
optional int32 total_allocated_containers = 17;
}
message EpochProto {

View File

@ -842,7 +842,7 @@ public class TestAppManager{
resourceSecondsMap.put(ResourceInformation.VCORES.getName(), 64L);
RMAppMetrics metrics =
new RMAppMetrics(Resource.newInstance(1234, 56),
10, 1, resourceSecondsMap, new HashMap<String, Long>());
10, 1, resourceSecondsMap, new HashMap<String, Long>(), 1234);
when(app.getRMAppMetrics()).thenReturn(metrics);
when(app.getDiagnostics()).thenReturn(new StringBuilder(
"Multiline\n\n\r\rDiagnostics=Diagn,ostic"));
@ -870,6 +870,7 @@ public class TestAppManager{
Assert.assertTrue(msg.contains("applicationNodeLabel=test"));
Assert.assertTrue(msg.contains("diagnostics=Multiline" + escaped
+ "Diagnostics\\=Diagn\\,ostic"));
Assert.assertTrue(msg.contains("totalAllocatedContainers=1234"));
}
@Test

View File

@ -229,6 +229,8 @@ public class TestContainerResourceUsage {
memorySeconds, metricsBefore.getMemorySeconds());
Assert.assertEquals("Unexpected VcoreSeconds value",
vcoreSeconds, metricsBefore.getVcoreSeconds());
Assert.assertEquals("Unexpected totalAllocatedContainers value",
NUM_CONTAINERS + 1, metricsBefore.getTotalAllocatedContainers());
// create new RM to represent RM restart. Load up the state store.
MockRM rm1 = new MockRM(conf, memStore);
@ -242,6 +244,9 @@ public class TestContainerResourceUsage {
metricsBefore.getVcoreSeconds(), metricsAfter.getVcoreSeconds());
Assert.assertEquals("Memory seconds were not the same after RM Restart",
metricsBefore.getMemorySeconds(), metricsAfter.getMemorySeconds());
Assert.assertEquals("TotalAllocatedContainers was not the same after " +
"RM Restart", metricsBefore.getTotalAllocatedContainers(),
metricsAfter.getTotalAllocatedContainers());
rm0.stop();
rm0.close();

View File

@ -196,7 +196,7 @@ public abstract class MockAsm extends MockApps {
@Override
public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<String, Long>(),
new HashMap<String, Long>());
new HashMap<String, Long>(), 0);
}
@Override

View File

@ -493,7 +493,7 @@ public class TestCombinedSystemMetricsPublisher {
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
when(rmApp.getRMAppMetrics()).thenReturn(
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceMap,
preemptedMap));
preemptedMap, 0));
when(rmApp.getApplicationTags()).thenReturn(
Collections.<String> emptySet());
ApplicationSubmissionContext appSubmissionContext =

View File

@ -516,7 +516,8 @@ public class TestSystemMetricsPublisher {
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
when(app.getRMAppMetrics())
.thenReturn(new RMAppMetrics(null, 0, 0, resourceMap, preemptedMap));
.thenReturn(new RMAppMetrics(null, 0, 0, resourceMap, preemptedMap,
0));
Set<String> appTags = new HashSet<String>();
appTags.add("test");
appTags.add("tags");

View File

@ -373,7 +373,7 @@ public class TestSystemMetricsPublisherForV2 {
.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
when(app.getRMAppMetrics()).thenReturn(
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceSecondsMap,
new HashMap<String, Long>()));
new HashMap<String, Long>(), 0));
when(app.getApplicationTags()).thenReturn(Collections.<String>emptySet());
ApplicationSubmissionContext appSubmissionContext =
mock(ApplicationSubmissionContext.class);

View File

@ -369,7 +369,7 @@ public class RMStateStoreTestBase {
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100,
oldAttemptState.getFinishTime(), new HashMap<String, Long>(), new HashMap<String, Long>());
oldAttemptState.getFinishTime(), new HashMap<String, Long>(), new HashMap<String, Long>(), 0);
store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
@ -393,7 +393,7 @@ public class RMStateStoreTestBase {
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111,
oldAttemptState.getFinishTime(), new HashMap<String, Long>(), new HashMap<String, Long>());
oldAttemptState.getFinishTime(), new HashMap<String, Long>(), new HashMap<String, Long>(), 0);
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down

View File

@ -561,7 +561,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
store.getCredentialsFromAppAttempt(mockAttempt),
startTime, RMAppAttemptState.FINISHED, "testUrl",
"test", FinalApplicationStatus.SUCCEEDED, 100,
finishTime, new HashMap<String, Long>(), new HashMap<String, Long>());
finishTime, new HashMap<String, Long>(), new HashMap<String, Long>(), 0);
store.updateApplicationAttemptState(newAttemptState);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
@ -811,7 +811,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
return ApplicationAttemptStateData.newInstance(attemptId,
container, null, startTime, RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
amExitStatus, 0, resourceSecondsMap, preemptedResoureSecondsMap);
amExitStatus, 0, resourceSecondsMap, preemptedResoureSecondsMap, 0);
}
private ApplicationAttemptId storeAttempt(RMStateStore store,

View File

@ -66,7 +66,7 @@ public class TestAppPage {
RMAppMetrics appMetrics =
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<String, Long>(),
new HashMap<String, Long>());
new HashMap<String, Long>(), 0);
when(app.getRMAppMetrics()).thenReturn(appMetrics);
// initialize RM Context, and create RMApp, without creating RMAppAttempt

View File

@ -138,7 +138,7 @@ public class TestRMWebAppFairScheduler {
@Override
public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0,
new HashMap<String, Long>(), new HashMap<String, Long>());
new HashMap<String, Long>(), new HashMap<String, Long>(), 0);
}
@Override
public YarnApplicationState createApplicationState() {