YARN-10200. Add number of containers to RMAppManager summary
(cherry picked from commit 2de0572cdc1c6fdbfaab108b169b2d5b0c077e86)
(cherry picked from commit 5d3fb0ebe9
)
This commit is contained in:
parent
dde8417972
commit
9c6dd8c83a
|
@ -218,7 +218,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
? ""
|
||||
: app.getApplicationSubmissionContext()
|
||||
.getNodeLabelExpression())
|
||||
.add("diagnostics", app.getDiagnostics());
|
||||
.add("diagnostics", app.getDiagnostics())
|
||||
.add("totalAllocatedContainers",
|
||||
metrics.getTotalAllocatedContainers());
|
||||
return summary;
|
||||
}
|
||||
|
||||
|
|
|
@ -898,7 +898,8 @@ public abstract class RMStateStore extends AbstractService {
|
|||
appAttempt.getMasterContainer(),
|
||||
credentials, appAttempt.getStartTime(),
|
||||
resUsage.getResourceUsageSecondsMap(),
|
||||
attempMetrics.getPreemptedResourceSecondsMap());
|
||||
attempMetrics.getPreemptedResourceSecondsMap(),
|
||||
attempMetrics.getTotalAllocatedContainers());
|
||||
|
||||
getRMStateStoreEventHandler().handle(
|
||||
new RMStateStoreAppAttemptEvent(attemptState));
|
||||
|
|
|
@ -46,7 +46,8 @@ public abstract class ApplicationAttemptStateData {
|
|||
String finalTrackingUrl, String diagnostics,
|
||||
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
|
||||
long finishTime, Map<String, Long> resourceSecondsMap,
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
Map<String, Long> preemptedResourceSecondsMap,
|
||||
int totalAllocatedContainers) {
|
||||
ApplicationAttemptStateData attemptStateData =
|
||||
Records.newRecord(ApplicationAttemptStateData.class);
|
||||
attemptStateData.setAttemptId(attemptId);
|
||||
|
@ -74,6 +75,7 @@ public abstract class ApplicationAttemptStateData {
|
|||
attemptStateData.setResourceSecondsMap(resourceSecondsMap);
|
||||
attemptStateData
|
||||
.setPreemptedResourceSecondsMap(preemptedResourceSecondsMap);
|
||||
attemptStateData.setTotalAllocatedContainers(totalAllocatedContainers);
|
||||
return attemptStateData;
|
||||
}
|
||||
|
||||
|
@ -81,10 +83,12 @@ public abstract class ApplicationAttemptStateData {
|
|||
ApplicationAttemptId attemptId, Container masterContainer,
|
||||
Credentials attemptTokens, long startTime,
|
||||
Map<String, Long> resourceSeondsMap,
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
Map<String, Long> preemptedResourceSecondsMap,
|
||||
int totalAllocatedContainers) {
|
||||
return newInstance(attemptId, masterContainer, attemptTokens, startTime,
|
||||
null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
|
||||
resourceSeondsMap, preemptedResourceSecondsMap);
|
||||
resourceSeondsMap, preemptedResourceSecondsMap,
|
||||
totalAllocatedContainers);
|
||||
}
|
||||
|
||||
|
||||
|
@ -276,4 +280,24 @@ public abstract class ApplicationAttemptStateData {
|
|||
@Unstable
|
||||
public abstract void setPreemptedResourceSecondsMap(
|
||||
Map<String, Long> preemptedResourceSecondsMap);
|
||||
|
||||
/**
|
||||
* Get total number of containers allocated for this attempt.
|
||||
*
|
||||
* @return total number of containers allocated for this attempt.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract int getTotalAllocatedContainers();
|
||||
|
||||
/**
|
||||
* Set total number of containers allocated for this attempt.
|
||||
*
|
||||
* @param totalAllocatedContainers total number of containers
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setTotalAllocatedContainers(
|
||||
int totalAllocatedContainers);
|
||||
|
||||
}
|
||||
|
|
|
@ -454,4 +454,16 @@ public class ApplicationAttemptStateDataPBImpl extends
|
|||
.convertMapToStringLongMapProtoList(preemptedResourceSecondsMap));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTotalAllocatedContainers() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getTotalAllocatedContainers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTotalAllocatedContainers(int totalAllocatedContainers) {
|
||||
maybeInitBuilder();
|
||||
builder.setTotalAllocatedContainers(totalAllocatedContainers);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1675,6 +1675,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
int numNonAMContainerPreempted = 0;
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
Map<String, Long> preemptedSecondsMap = new HashMap<>();
|
||||
int totalAllocatedContainers = 0;
|
||||
this.readLock.lock();
|
||||
try {
|
||||
for (RMAppAttempt attempt : attempts.values()) {
|
||||
|
@ -1704,6 +1705,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
value += entry.getValue();
|
||||
preemptedSecondsMap.put(entry.getKey(), value);
|
||||
}
|
||||
totalAllocatedContainers +=
|
||||
attemptMetrics.getTotalAllocatedContainers();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -1711,7 +1714,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
|
||||
return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted,
|
||||
numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap);
|
||||
numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap,
|
||||
totalAllocatedContainers);
|
||||
}
|
||||
|
||||
@Private
|
||||
|
|
|
@ -30,16 +30,19 @@ public class RMAppMetrics {
|
|||
final int numAMContainersPreempted;
|
||||
private final Map<String, Long> resourceSecondsMap;
|
||||
private final Map<String, Long> preemptedResourceSecondsMap;
|
||||
private int totalAllocatedContainers;
|
||||
|
||||
public RMAppMetrics(Resource resourcePreempted,
|
||||
int numNonAMContainersPreempted, int numAMContainersPreempted,
|
||||
Map<String, Long> resourceSecondsMap,
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
Map<String, Long> preemptedResourceSecondsMap,
|
||||
int totalAllocatedContainers) {
|
||||
this.resourcePreempted = resourcePreempted;
|
||||
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
|
||||
this.numAMContainersPreempted = numAMContainersPreempted;
|
||||
this.resourceSecondsMap = resourceSecondsMap;
|
||||
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
|
||||
this.totalAllocatedContainers = totalAllocatedContainers;
|
||||
}
|
||||
|
||||
public Resource getResourcePreempted() {
|
||||
|
@ -83,4 +86,7 @@ public class RMAppMetrics {
|
|||
return preemptedResourceSecondsMap;
|
||||
}
|
||||
|
||||
public int getTotalAllocatedContainers() {
|
||||
return totalAllocatedContainers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -991,6 +991,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
.updateAggregateAppResourceUsage(attemptState.getResourceSecondsMap());
|
||||
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
|
||||
attemptState.getPreemptedResourceSecondsMap());
|
||||
this.attemptMetrics.setTotalAllocatedContainers(
|
||||
attemptState.getTotalAllocatedContainers());
|
||||
}
|
||||
|
||||
public void transferStateFromAttempt(RMAppAttempt attempt) {
|
||||
|
@ -1424,7 +1426,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
rmStore.getCredentialsFromAppAttempt(this), startTime,
|
||||
stateToBeStored, finalTrackingUrl, diags.toString(), finalStatus, exitStatus,
|
||||
getFinishTime(), resUsage.getResourceUsageSecondsMap(),
|
||||
this.attemptMetrics.getPreemptedResourceSecondsMap());
|
||||
this.attemptMetrics.getPreemptedResourceSecondsMap(),
|
||||
this.attemptMetrics.getTotalAllocatedContainers());
|
||||
LOG.info("Updating application attempt " + applicationAttemptId
|
||||
+ " with final state: " + targetedFinalState + ", and exit status: "
|
||||
+ exitStatus);
|
||||
|
@ -1855,7 +1858,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
ContainerExitStatus.INVALID, appAttempt.getFinishTime(),
|
||||
appAttempt.attemptMetrics.getAggregateAppResourceUsage()
|
||||
.getResourceUsageSecondsMap(),
|
||||
appAttempt.attemptMetrics.getPreemptedResourceSecondsMap());
|
||||
appAttempt.attemptMetrics.getPreemptedResourceSecondsMap(),
|
||||
appAttempt.attemptMetrics.getTotalAllocatedContainers());
|
||||
appAttempt.rmContext.getStateStore()
|
||||
.updateApplicationAttemptState(attemptState);
|
||||
}
|
||||
|
|
|
@ -230,6 +230,10 @@ public class RMAppAttemptMetrics {
|
|||
return this.totalAllocatedContainers;
|
||||
}
|
||||
|
||||
public void setTotalAllocatedContainers(int totalAllocatedContainers) {
|
||||
this.totalAllocatedContainers = totalAllocatedContainers;
|
||||
}
|
||||
|
||||
public Resource getApplicationAttemptHeadroom() {
|
||||
return Resource.newInstance(applicationHeadroom);
|
||||
}
|
||||
|
|
|
@ -90,6 +90,7 @@ message ApplicationAttemptStateDataProto {
|
|||
optional int64 preempted_vcore_seconds = 14;
|
||||
repeated StringLongMapProto application_resource_usage_map = 15;
|
||||
repeated StringLongMapProto preempted_resource_usage_map = 16;
|
||||
optional int32 total_allocated_containers = 17;
|
||||
}
|
||||
|
||||
message EpochProto {
|
||||
|
|
|
@ -1009,7 +1009,7 @@ public class TestAppManager{
|
|||
resourceSecondsMap.put(ResourceInformation.VCORES.getName(), 64L);
|
||||
RMAppMetrics metrics =
|
||||
new RMAppMetrics(Resource.newInstance(1234, 56),
|
||||
10, 1, resourceSecondsMap, new HashMap<>());
|
||||
10, 1, resourceSecondsMap, new HashMap<>(), 1234);
|
||||
when(app.getRMAppMetrics()).thenReturn(metrics);
|
||||
when(app.getDiagnostics()).thenReturn(new StringBuilder(
|
||||
"Multiline\n\n\r\rDiagnostics=Diagn,ostic"));
|
||||
|
@ -1037,6 +1037,7 @@ public class TestAppManager{
|
|||
assertTrue(msg.contains("applicationNodeLabel=test"));
|
||||
assertTrue(msg.contains("diagnostics=Multiline" + escaped
|
||||
+ "Diagnostics\\=Diagn\\,ostic"));
|
||||
assertTrue(msg.contains("totalAllocatedContainers=1234"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -229,6 +229,8 @@ public class TestContainerResourceUsage {
|
|||
memorySeconds, metricsBefore.getMemorySeconds());
|
||||
Assert.assertEquals("Unexpected VcoreSeconds value",
|
||||
vcoreSeconds, metricsBefore.getVcoreSeconds());
|
||||
Assert.assertEquals("Unexpected totalAllocatedContainers value",
|
||||
NUM_CONTAINERS + 1, metricsBefore.getTotalAllocatedContainers());
|
||||
|
||||
// create new RM to represent RM restart. Load up the state store.
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
|
@ -242,6 +244,9 @@ public class TestContainerResourceUsage {
|
|||
metricsBefore.getVcoreSeconds(), metricsAfter.getVcoreSeconds());
|
||||
Assert.assertEquals("Memory seconds were not the same after RM Restart",
|
||||
metricsBefore.getMemorySeconds(), metricsAfter.getMemorySeconds());
|
||||
Assert.assertEquals("TotalAllocatedContainers was not the same after " +
|
||||
"RM Restart", metricsBefore.getTotalAllocatedContainers(),
|
||||
metricsAfter.getTotalAllocatedContainers());
|
||||
|
||||
rm0.stop();
|
||||
rm0.close();
|
||||
|
|
|
@ -198,7 +198,7 @@ public abstract class MockAsm extends MockApps {
|
|||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<>(),
|
||||
new HashMap<>());
|
||||
new HashMap<>(), 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -479,7 +479,7 @@ public class TestCombinedSystemMetricsPublisher {
|
|||
preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
|
||||
when(rmApp.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceMap,
|
||||
preemptedMap));
|
||||
preemptedMap, 0));
|
||||
when(rmApp.getApplicationTags()).thenReturn(
|
||||
Collections.<String> emptySet());
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
|
|
|
@ -517,7 +517,8 @@ public class TestSystemMetricsPublisher {
|
|||
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
|
||||
preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
|
||||
when(app.getRMAppMetrics())
|
||||
.thenReturn(new RMAppMetrics(null, 0, 0, resourceMap, preemptedMap));
|
||||
.thenReturn(new RMAppMetrics(null, 0, 0, resourceMap, preemptedMap,
|
||||
0));
|
||||
Set<String> appTags = new HashSet<String>();
|
||||
appTags.add("test");
|
||||
appTags.add("tags");
|
||||
|
|
|
@ -419,7 +419,7 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
|
||||
when(app.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceSecondsMap,
|
||||
new HashMap<>()));
|
||||
new HashMap<>(), 0));
|
||||
when(app.getApplicationTags()).thenReturn(Collections.<String>emptySet());
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
|
|
|
@ -371,7 +371,8 @@ public class RMStateStoreTestBase {
|
|||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 100,
|
||||
oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>());
|
||||
oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>(),
|
||||
0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
|
||||
// test updating the state of an app/attempt whose initial state was not
|
||||
|
@ -396,7 +397,8 @@ public class RMStateStoreTestBase {
|
|||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 111,
|
||||
oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>());
|
||||
oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>(),
|
||||
0);
|
||||
store.updateApplicationAttemptState(dummyAttempt);
|
||||
|
||||
// let things settle down
|
||||
|
|
|
@ -566,7 +566,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
store.getCredentialsFromAppAttempt(mockAttempt),
|
||||
startTime, RMAppAttemptState.FINISHED, "testUrl",
|
||||
"test", FinalApplicationStatus.SUCCEEDED, 100,
|
||||
finishTime, new HashMap<>(), new HashMap<>());
|
||||
finishTime, new HashMap<>(), new HashMap<>(), 0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
@ -816,7 +816,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
return ApplicationAttemptStateData.newInstance(attemptId,
|
||||
container, null, startTime, RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
|
||||
amExitStatus, 0, resourceSecondsMap, preemptedResoureSecondsMap);
|
||||
amExitStatus, 0, resourceSecondsMap, preemptedResoureSecondsMap, 0);
|
||||
}
|
||||
|
||||
private ApplicationAttemptId storeAttempt(RMStateStore store,
|
||||
|
|
|
@ -66,7 +66,7 @@ public class TestAppPage {
|
|||
|
||||
RMAppMetrics appMetrics =
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<>(),
|
||||
new HashMap<>());
|
||||
new HashMap<>(), 0);
|
||||
when(app.getRMAppMetrics()).thenReturn(appMetrics);
|
||||
|
||||
// initialize RM Context, and create RMApp, without creating RMAppAttempt
|
||||
|
|
|
@ -138,7 +138,7 @@ public class TestRMWebAppFairScheduler {
|
|||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0,
|
||||
new HashMap<>(), new HashMap<>());
|
||||
new HashMap<>(), new HashMap<>(), 0);
|
||||
}
|
||||
@Override
|
||||
public YarnApplicationState createApplicationState() {
|
||||
|
|
Loading…
Reference in New Issue