YARN-6232. Update resource usage and preempted resource calculations to take into account all resource types. Contributed by Varun Vasudev.

This commit is contained in:
Sunil G 2017-03-06 11:34:20 +05:30 committed by Wangda Tan
parent 7805deed48
commit dae65f3bef
41 changed files with 868 additions and 253 deletions

View File

@ -24,6 +24,9 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
import java.util.HashMap;
import java.util.Map;
/**
* Contains various scheduling metrics to be reported by UI and CLI.
*/
@ -35,9 +38,9 @@ public abstract class ApplicationResourceUsageReport {
@Unstable
public static ApplicationResourceUsageReport newInstance(
int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources, long memorySeconds,
long vcoreSeconds, float queueUsagePerc, float clusterUsagePerc,
long preemptedMemorySeconds, long preemptedVcoresSeconds) {
Resource reservedResources, Resource neededResources,
Map<String, Long> resourceSecondsMap, float queueUsagePerc,
float clusterUsagePerc, Map<String, Long> preemtedResourceSecondsMap) {
ApplicationResourceUsageReport report =
Records.newRecord(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers);
@ -45,12 +48,10 @@ public static ApplicationResourceUsageReport newInstance(
report.setUsedResources(usedResources);
report.setReservedResources(reservedResources);
report.setNeededResources(neededResources);
report.setMemorySeconds(memorySeconds);
report.setVcoreSeconds(vcoreSeconds);
report.setResourceSecondsMap(resourceSecondsMap);
report.setQueueUsagePercentage(queueUsagePerc);
report.setClusterUsagePercentage(clusterUsagePerc);
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
report.setPreemptedVcoreSeconds(preemptedVcoresSeconds);
report.setPreemptedResourceSecondsMap(preemtedResourceSecondsMap);
return report;
}
@ -229,4 +230,47 @@ public static ApplicationResourceUsageReport newInstance(
@Public
@Unstable
public abstract long getPreemptedVcoreSeconds();
/**
* Get the aggregated number of resources that the application has
* allocated times the number of seconds the application has been running.
* @return map containing the resource name and aggregated resource-seconds
*/
@Public
@Unstable
public abstract Map<String, Long> getResourceSecondsMap();
/**
* Set the aggregated number of resources that the application has
* allocated times the number of seconds the application has been running.
* @param resourceSecondsMap map containing the resource name and aggregated
* resource-seconds
*/
@Private
@Unstable
public abstract void setResourceSecondsMap(
Map<String, Long> resourceSecondsMap);
/**
* Get the aggregated number of resources preempted that the application has
* allocated times the number of seconds the application has been running.
* @return map containing the resource name and aggregated preempted
* resource-seconds
*/
@Public
@Unstable
public abstract Map<String, Long> getPreemptedResourceSecondsMap();
/**
* Set the aggregated number of resources preempted that the application has
* allocated times the number of seconds the application has been running.
* @param preemptedResourceSecondsMap map containing the resource name and
* aggregated preempted resource-seconds
*/
@Private
@Unstable
public abstract void setPreemptedResourceSecondsMap(
Map<String, Long> preemptedResourceSecondsMap);
}

View File

@ -195,6 +195,11 @@ message LocalResourceProto {
optional bool should_be_uploaded_to_shared_cache = 7;
}
message StringLongMapProto {
required string key = 1;
required int64 value = 2;
}
message ApplicationResourceUsageReportProto {
optional int32 num_used_containers = 1;
optional int32 num_reserved_containers = 2;
@ -207,6 +212,8 @@ message ApplicationResourceUsageReportProto {
optional float cluster_usage_percentage = 9;
optional int64 preempted_memory_seconds = 10;
optional int64 preempted_vcore_seconds = 11;
repeated StringLongMapProto application_resource_usage_map = 12;
repeated StringLongMapProto application_preempted_resource_usage_map = 13;
}
message ApplicationReportProto {

View File

@ -62,6 +62,8 @@
import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.yarn.util.StringHelper.getResourceSecondsString;
@Private
@Unstable
public class ApplicationCLI extends YarnCLI {
@ -711,24 +713,9 @@ private int printApplicationReport(String applicationId)
appReportStr.println(appReport.getRpcPort());
appReportStr.print("\tAM Host : ");
appReportStr.println(appReport.getHost());
appReportStr.print("\tAggregate Resource Allocation : ");
ApplicationResourceUsageReport usageReport =
appReport.getApplicationResourceUsageReport();
if (usageReport != null) {
//completed app report in the timeline server doesn't have usage report
appReportStr.print(usageReport.getMemorySeconds() + " MB-seconds, ");
appReportStr.println(usageReport.getVcoreSeconds() + " vcore-seconds");
appReportStr.print("\tAggregate Resource Preempted : ");
appReportStr.print(usageReport.getPreemptedMemorySeconds() +
" MB-seconds, ");
appReportStr.println(usageReport.getPreemptedVcoreSeconds() +
" vcore-seconds");
} else {
appReportStr.println("N/A");
appReportStr.print("\tAggregate Resource Preempted : ");
appReportStr.println("N/A");
}
printResourceUsage(appReportStr, usageReport);
appReportStr.print("\tLog Aggregation Status : ");
appReportStr.println(appReport.getLogAggregationStatus() == null ? "N/A"
: appReport.getLogAggregationStatus());
@ -759,6 +746,22 @@ private int printApplicationReport(String applicationId)
return 0;
}
private void printResourceUsage(PrintWriter appReportStr,
ApplicationResourceUsageReport usageReport) {
appReportStr.print("\tAggregate Resource Allocation : ");
if (usageReport != null) {
appReportStr.println(
getResourceSecondsString(usageReport.getResourceSecondsMap()));
appReportStr.print("\tAggregate Resource Preempted : ");
appReportStr.println(getResourceSecondsString(
usageReport.getPreemptedResourceSecondsMap()));
} else {
appReportStr.println("N/A");
appReportStr.print("\tAggregate Resource Preempted : ");
appReportStr.println("N/A");
}
}
private String getAllValidApplicationStates() {
StringBuilder sb = new StringBuilder();
sb.append("The valid application state can be" + " one of the following: ");

View File

@ -39,8 +39,10 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@ -69,6 +71,7 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@ -118,9 +121,18 @@ public void testGetApplicationReport() throws Exception {
for (int i = 0; i < 2; ++i) {
ApplicationCLI cli = createAndGetAppCLI();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
Map<String, Long> resourceSecondsMap = new HashMap<>();
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
resourceSecondsMap.put(ResourceInformation.MEMORY_MB.getName(), 123456L);
resourceSecondsMap.put(ResourceInformation.VCORES.getName(), 4567L);
preemptedResoureSecondsMap
.put(ResourceInformation.MEMORY_MB.getName(), 1111L);
preemptedResoureSecondsMap
.put(ResourceInformation.VCORES.getName(), 2222L);
ApplicationResourceUsageReport usageReport = i == 0 ? null :
ApplicationResourceUsageReport.newInstance(
2, 0, null, null, null, 123456, 4567, 0, 0, 1111, 2222);
ApplicationResourceUsageReport
.newInstance(2, 0, null, null, null, resourceSecondsMap, 0, 0,
preemptedResoureSecondsMap);
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,

View File

@ -22,12 +22,16 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import com.google.protobuf.TextFormat;
import java.util.HashMap;
import java.util.Map;
@Private
@Unstable
public class ApplicationResourceUsageReportPBImpl
@ -41,6 +45,9 @@ public class ApplicationResourceUsageReportPBImpl
Resource reservedResources;
Resource neededResources;
private Map<String, Long> resourceSecondsMap;
private Map<String, Long> preemptedResourceSecondsMap;
public ApplicationResourceUsageReportPBImpl() {
builder = ApplicationResourceUsageReportProto.newBuilder();
}
@ -49,6 +56,8 @@ public ApplicationResourceUsageReportPBImpl(
ApplicationResourceUsageReportProto proto) {
this.proto = proto;
viaProto = true;
getResourceSecondsMap();
getPreemptedResourceSecondsMap();
}
public synchronized ApplicationResourceUsageReportProto getProto() {
@ -89,6 +98,23 @@ private void mergeLocalToBuilder() {
if (this.neededResources != null) {
builder.setNeededResources(convertToProtoFormat(this.neededResources));
}
builder.clearApplicationResourceUsageMap();
builder.clearApplicationPreemptedResourceUsageMap();
if (preemptedResourceSecondsMap != null && !preemptedResourceSecondsMap
.isEmpty()) {
builder.addAllApplicationPreemptedResourceUsageMap(ProtoUtils
.convertMapToStringLongMapProtoList(preemptedResourceSecondsMap));
}
if (resourceSecondsMap != null && !resourceSecondsMap.isEmpty()) {
builder.addAllApplicationResourceUsageMap(
ProtoUtils.convertMapToStringLongMapProtoList(resourceSecondsMap));
}
builder.setMemorySeconds(this.getMemorySeconds());
builder.setVcoreSeconds(this.getVcoreSeconds());
builder.setPreemptedMemorySeconds(this.getPreemptedMemorySeconds());
builder.setPreemptedVcoreSeconds(this.getPreemptedVcoreSeconds());
}
private void mergeLocalToProto() {
@ -196,54 +222,64 @@ public synchronized void setNeededResources(Resource reserved_resources) {
@Override
public synchronized void setMemorySeconds(long memory_seconds) {
maybeInitBuilder();
builder.setMemorySeconds(memory_seconds);
getResourceSecondsMap()
.put(ResourceInformation.MEMORY_MB.getName(), memory_seconds);
}
@Override
public synchronized long getMemorySeconds() {
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemorySeconds();
Map<String, Long> tmp = getResourceSecondsMap();
if (tmp.containsKey(ResourceInformation.MEMORY_MB.getName())) {
return tmp.get(ResourceInformation.MEMORY_MB.getName());
}
return 0;
}
@Override
public synchronized void setVcoreSeconds(long vcore_seconds) {
maybeInitBuilder();
builder.setVcoreSeconds(vcore_seconds);
getResourceSecondsMap()
.put(ResourceInformation.VCORES.getName(), vcore_seconds);
}
@Override
public synchronized long getVcoreSeconds() {
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getVcoreSeconds());
Map<String, Long> tmp = getResourceSecondsMap();
if (tmp.containsKey(ResourceInformation.VCORES.getName())) {
return tmp.get(ResourceInformation.VCORES.getName());
}
return 0;
}
@Override
public synchronized void setPreemptedMemorySeconds(
long preemptedMemorySeconds) {
maybeInitBuilder();
builder.setPreemptedMemorySeconds(preemptedMemorySeconds);
getPreemptedResourceSecondsMap()
.put(ResourceInformation.MEMORY_MB.getName(), preemptedMemorySeconds);
}
@Override
public synchronized long getPreemptedMemorySeconds() {
ApplicationResourceUsageReportProtoOrBuilder p =
viaProto ? proto : builder;
return p.getPreemptedMemorySeconds();
Map<String, Long> tmp = getPreemptedResourceSecondsMap();
if (tmp.containsKey(ResourceInformation.MEMORY_MB.getName())) {
return tmp.get(ResourceInformation.MEMORY_MB.getName());
}
return 0;
}
@Override
public synchronized void setPreemptedVcoreSeconds(
long vcoreSeconds) {
maybeInitBuilder();
builder.setPreemptedVcoreSeconds(vcoreSeconds);
getPreemptedResourceSecondsMap()
.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
}
@Override
public synchronized long getPreemptedVcoreSeconds() {
ApplicationResourceUsageReportProtoOrBuilder p =
viaProto ? proto : builder;
return (p.getPreemptedVcoreSeconds());
Map<String, Long> tmp = getPreemptedResourceSecondsMap();
if (tmp.containsKey(ResourceInformation.VCORES.getName())) {
return tmp.get(ResourceInformation.VCORES.getName());
}
return 0;
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
@ -277,4 +313,81 @@ public synchronized void setClusterUsagePercentage(float clusterUsagePerc) {
maybeInitBuilder();
builder.setClusterUsagePercentage((clusterUsagePerc));
}
@Override
public synchronized void setResourceSecondsMap(
Map<String, Long> resourceSecondsMap) {
this.resourceSecondsMap = resourceSecondsMap;
if (resourceSecondsMap == null) {
return;
}
if (!resourceSecondsMap
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
this.setMemorySeconds(0L);
}
if (!resourceSecondsMap.containsKey(ResourceInformation.VCORES.getName())) {
this.setVcoreSeconds(0L);
}
}
@Override
public synchronized Map<String, Long> getResourceSecondsMap() {
if (this.resourceSecondsMap != null) {
return this.resourceSecondsMap;
}
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
this.resourceSecondsMap = ProtoUtils
.convertStringLongMapProtoListToMap(
p.getApplicationResourceUsageMapList());
if (!this.resourceSecondsMap
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
this.setMemorySeconds(p.getMemorySeconds());
}
if (!this.resourceSecondsMap
.containsKey(ResourceInformation.VCORES.getName())) {
this.setVcoreSeconds(p.getVcoreSeconds());
}
this.setMemorySeconds(p.getMemorySeconds());
this.setVcoreSeconds(p.getVcoreSeconds());
return this.resourceSecondsMap;
}
@Override
public synchronized void setPreemptedResourceSecondsMap(
Map<String, Long> preemptedResourceSecondsMap) {
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
if (preemptedResourceSecondsMap == null) {
return;
}
if (!preemptedResourceSecondsMap
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
this.setPreemptedMemorySeconds(0L);
}
if (!preemptedResourceSecondsMap
.containsKey(ResourceInformation.VCORES.getName())) {
this.setPreemptedVcoreSeconds(0L);
}
}
@Override
public synchronized Map<String, Long> getPreemptedResourceSecondsMap() {
if (this.preemptedResourceSecondsMap != null) {
return this.preemptedResourceSecondsMap;
}
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
this.preemptedResourceSecondsMap = ProtoUtils
.convertStringLongMapProtoListToMap(
p.getApplicationPreemptedResourceUsageMapList());
if (!this.preemptedResourceSecondsMap
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
this.setPreemptedMemorySeconds(p.getPreemptedMemorySeconds());
}
if (!this.preemptedResourceSecondsMap
.containsKey(ResourceInformation.VCORES.getName())) {
this.setPreemptedVcoreSeconds(p.getPreemptedVcoreSeconds());
}
this.setPreemptedMemorySeconds(p.getPreemptedMemorySeconds());
this.setPreemptedVcoreSeconds(p.getPreemptedVcoreSeconds());
return this.preemptedResourceSecondsMap;
}
}

View File

@ -19,6 +19,10 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -45,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@ -446,6 +451,35 @@ public static ResourceTypesProto converToProtoFormat(ResourceTypes e) {
public static ResourceTypes convertFromProtoFormat(ResourceTypesProto e) {
return ResourceTypes.valueOf(e.name());
}
public static Map<String, Long> convertStringLongMapProtoListToMap(
List<YarnProtos.StringLongMapProto> pList) {
Resource tmp = Resource.newInstance(0, 0);
Map<String, Long> ret = new HashMap<>();
for (Map.Entry<String, ResourceInformation> entry : tmp.getResources()
.entrySet()) {
ret.put(entry.getKey(), 0L);
}
if (pList != null) {
for (YarnProtos.StringLongMapProto p : pList) {
ret.put(p.getKey(), p.getValue());
}
}
return ret;
}
public static List<YarnProtos.StringLongMapProto> convertMapToStringLongMapProtoList(
Map<String, Long> map) {
List<YarnProtos.StringLongMapProto> ret = new ArrayList<>();
for (Map.Entry<String, Long> entry : map.entrySet()) {
YarnProtos.StringLongMapProto.Builder tmp =
YarnProtos.StringLongMapProto.newBuilder();
tmp.setKey(entry.getKey());
tmp.setValue(entry.getValue());
ret.add(tmp.build());
}
return ret;
}
}

View File

@ -20,9 +20,15 @@
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
/**
* Common string manipulation helpers
@ -174,4 +180,34 @@ private static void uappend(StringBuilder sb, String part) {
}
sb.append(part);
}
public static String getResourceSecondsString(Map<String, Long> targetMap) {
List<String> strings = new ArrayList<>(targetMap.size());
//completed app report in the timeline server doesn't have usage report
Long memorySeconds = 0L;
Long vcoreSeconds = 0L;
if (targetMap.containsKey(ResourceInformation.MEMORY_MB.getName())) {
memorySeconds = targetMap.get(ResourceInformation.MEMORY_MB.getName());
}
if (targetMap.containsKey(ResourceInformation.VCORES.getName())) {
vcoreSeconds = targetMap.get(ResourceInformation.VCORES.getName());
}
strings.add(memorySeconds + " MB-seconds");
strings.add(vcoreSeconds + " vcore-seconds");
Map<String, ResourceInformation> tmp = ResourceUtils.getResourceTypes();
if (targetMap.size() > 2) {
for (Map.Entry<String, Long> entry : targetMap.entrySet()) {
if (!entry.getKey().equals(ResourceInformation.MEMORY_MB.getName())
&& !entry.getKey().equals(ResourceInformation.VCORES.getName())) {
String units = "";
if (tmp.containsKey(entry.getKey())) {
units = tmp.get(entry.getKey()).getUnits();
}
strings.add(entry.getValue() + " " + entry.getKey() + "-" + units
+ "seconds");
}
}
}
return String.join(", ", strings);
}
}

View File

@ -37,6 +37,9 @@ public class BasePBImplRecordsTest {
@SuppressWarnings("checkstyle:visibilitymodifier")
protected static HashMap<Type, Object> typeValueCache =
new HashMap<Type, Object>();
@SuppressWarnings("checkstyle:visibilitymodifier")
protected static HashMap<Type, List<String>> excludedPropertiesMap =
new HashMap<>();
private static Random rand = new Random();
private static byte [] bytes = new byte[] {'1', '2', '3', '4'};
@ -167,6 +170,10 @@ public String toString() {
private <R> Map<String, GetSetPair> getGetSetPairs(Class<R> recordClass)
throws Exception {
Map<String, GetSetPair> ret = new HashMap<String, GetSetPair>();
List<String> excluded = null;
if (excludedPropertiesMap.containsKey(recordClass.getClass())) {
excluded = excludedPropertiesMap.get(recordClass.getClass());
}
Method [] methods = recordClass.getDeclaredMethods();
// get all get methods
for (int i = 0; i < methods.length; i++) {
@ -224,6 +231,11 @@ private <R> Map<String, GetSetPair> getGetSetPairs(Class<R> recordClass)
(gsp.setMethod == null)) {
LOG.info(String.format("Exclude potential property: %s\n", gsp.propertyName));
itr.remove();
} else if ((excluded != null && excluded.contains(gsp.propertyName))) {
LOG.info(String.format(
"Excluding potential property(present in exclusion list): %s\n",
gsp.propertyName));
itr.remove();
} else {
LOG.info(String.format("New property: %s type: %s", gsp.toString(), gsp.type));
gsp.testValue = genTypeValue(gsp.type);

View File

@ -343,6 +343,8 @@
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Arrays;
/**
* Test class for YARN API protocol records.
@ -745,6 +747,8 @@ public void testApplicationReportPBImpl() throws Exception {
@Test
public void testApplicationResourceUsageReportPBImpl() throws Exception {
excludedPropertiesMap.put(ApplicationResourceUsageReportPBImpl.class.getClass(),
Arrays.asList("PreemptedResourceSecondsMap", "ResourceSecondsMap"));
validatePBImplRecord(ApplicationResourceUsageReportPBImpl.class,
ApplicationResourceUsageReportProto.class);
}

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -338,9 +339,20 @@ private static ApplicationReportExt convertToApplicationReport(
ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS);
long preemptedVcoreSeconds = parseLong(entityInfo,
ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS);
appResources = ApplicationResourceUsageReport.newInstance(0, 0, null,
null, null, memorySeconds, vcoreSeconds, 0, 0,
preemptedMemorySeconds, preemptedVcoreSeconds);
Map<String, Long> resourceSecondsMap = new HashMap<>();
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
resourceSecondsMap
.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds);
resourceSecondsMap
.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(),
preemptedMemorySeconds);
preemptedResoureSecondsMap
.put(ResourceInformation.VCORES.getName(), preemptedVcoreSeconds);
appResources = ApplicationResourceUsageReport
.newInstance(0, 0, null, null, null, resourceSecondsMap, 0, 0,
preemptedResoureSecondsMap);
}
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {

View File

@ -65,8 +65,6 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
@ -447,12 +445,12 @@ public static ApplicationSubmissionContext newApplicationSubmissionContext(
queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete,
maxAppAttempts, resource, null);
}
public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources, long memorySeconds,
long vcoreSeconds, long preemptedMemorySeconds,
long preemptedVcoreSeconds) {
Resource reservedResources, Resource neededResources,
Map<String, Long> resourceSecondsMap,
Map<String, Long> preemptedResourceSecondsMap) {
ApplicationResourceUsageReport report =
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers);
@ -460,10 +458,8 @@ public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
report.setUsedResources(usedResources);
report.setReservedResources(reservedResources);
report.setNeededResources(neededResources);
report.setMemorySeconds(memorySeconds);
report.setVcoreSeconds(vcoreSeconds);
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
report.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
report.setResourceSecondsMap(resourceSecondsMap);
report.setPreemptedResourceSecondsMap(preemptedResourceSecondsMap);
return report;
}

View File

@ -70,6 +70,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.hadoop.yarn.util.StringHelper;
/**
* This class manages the list of applications for the resource manager.
@ -189,7 +190,12 @@ public static SummaryBuilder createAppSummary(RMApp app) {
.add("preemptedAMContainers", metrics.getNumAMContainersPreempted())
.add("preemptedNonAMContainers", metrics.getNumNonAMContainersPreempted())
.add("preemptedResources", metrics.getResourcePreempted())
.add("applicationType", app.getApplicationType());
.add("applicationType", app.getApplicationType())
.add("resourceSeconds", StringHelper
.getResourceSecondsString(metrics.getResourceSecondsMap()))
.add("preemptedResourceSeconds", StringHelper
.getResourceSecondsString(
metrics.getPreemptedResourceSecondsMap()));
return summary;
}

View File

@ -483,7 +483,7 @@ public static YarnApplicationAttemptState createApplicationAttemptState(
DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
Resources.createResource(-1, -1), Resources.createResource(-1, -1),
Resources.createResource(-1, -1), 0, 0, 0, 0);
Resources.createResource(-1, -1), new HashMap<>(), new HashMap<>());
/**
@ -672,4 +672,12 @@ public static void convertProfileToResourceCapability(ResourceRequest ask,
.debug("Converted profile to resource capability for ask " + ask);
}
}
public static Long getOrDefault(Map<String, Long> map, String key,
Long defaultValue) {
if (map.containsKey(key)) {
return map.get(key);
}
return defaultValue;
}
}

View File

@ -851,11 +851,8 @@ public void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(),
credentials, appAttempt.getStartTime(),
resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds(),
attempMetrics.getPreemptedMemory(),
attempMetrics.getPreemptedVcore()
);
resUsage.getResourceUsageSecondsMap(),
attempMetrics.getPreemptedResourceSecondsMap());
getRMStateStoreEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));

View File

@ -25,23 +25,28 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import java.util.Map;
/*
* Contains the state data that needs to be persisted for an ApplicationAttempt
*/
@Public
@Unstable
public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
Credentials attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long finishTime, long memorySeconds, long vcoreSeconds,
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
long finishTime, Map<String, Long> resourceSecondsMap,
Map<String, Long> preemptedResourceSecondsMap) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@ -54,23 +59,33 @@ public static ApplicationAttemptStateData newInstance(
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
attemptStateData.setFinishTime(finishTime);
attemptStateData.setMemorySeconds(memorySeconds);
attemptStateData.setVcoreSeconds(vcoreSeconds);
attemptStateData.setPreemptedMemorySeconds(preemptedMemorySeconds);
attemptStateData.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
attemptStateData.setMemorySeconds(RMServerUtils
.getOrDefault(resourceSecondsMap,
ResourceInformation.MEMORY_MB.getName(), 0L));
attemptStateData.setVcoreSeconds(RMServerUtils
.getOrDefault(resourceSecondsMap, ResourceInformation.VCORES.getName(),
0L));
attemptStateData.setPreemptedMemorySeconds(RMServerUtils
.getOrDefault(preemptedResourceSecondsMap,
ResourceInformation.MEMORY_MB.getName(), 0L));
attemptStateData.setPreemptedVcoreSeconds(RMServerUtils
.getOrDefault(preemptedResourceSecondsMap,
ResourceInformation.VCORES.getName(), 0L));
attemptStateData.setResourceSecondsMap(resourceSecondsMap);
attemptStateData
.setPreemptedResourceSecondsMap(preemptedResourceSecondsMap);
return attemptStateData;
}
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container masterContainer,
Credentials attemptTokens, long startTime, long memorySeconds,
long vcoreSeconds, long preemptedMemorySeconds,
long preemptedVcoreSeconds) {
return newInstance(attemptId, masterContainer, attemptTokens,
startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
memorySeconds, vcoreSeconds,
preemptedMemorySeconds, preemptedVcoreSeconds);
}
Credentials attemptTokens, long startTime,
Map<String, Long> resourceSeondsMap,
Map<String, Long> preemptedResourceSecondsMap) {
return newInstance(attemptId, masterContainer, attemptTokens, startTime,
null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
resourceSeondsMap, preemptedResourceSecondsMap);
}
public abstract ApplicationAttemptStateDataProto getProto();
@ -215,4 +230,50 @@ public abstract void setFinalApplicationStatus(
@Public
@Unstable
public abstract void setPreemptedVcoreSeconds(long vcoreSeconds);
/**
* Get the aggregated number of resources preempted that the application has
* allocated times the number of seconds the application has been running.
*
* @return map containing the resource name and aggregated preempted
* resource-seconds
*/
@Public
@Unstable
public abstract Map<String, Long> getResourceSecondsMap();
/**
* Set the aggregated number of resources that the application has
* allocated times the number of seconds the application has been running.
*
* @param resourceSecondsMap map containing the resource name and aggregated
* resource-seconds
*/
@Public
@Unstable
public abstract void setResourceSecondsMap(
Map<String, Long> resourceSecondsMap);
/**
* Get the aggregated number of resources preempted that the application has
* allocated times the number of seconds the application has been running.
*
* @return map containing the resource name and aggregated preempted
* resource-seconds
*/
@Public
@Unstable
public abstract Map<String, Long> getPreemptedResourceSecondsMap();
/**
* Set the aggregated number of resources preempted that the application has
* allocated times the number of seconds the application has been running.
*
* @param preemptedResourceSecondsMap map containing the resource name and
* aggregated preempted resource-seconds
*/
@Public
@Unstable
public abstract void setPreemptedResourceSecondsMap(
Map<String, Long> preemptedResourceSecondsMap);
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -55,6 +56,9 @@ public class ApplicationAttemptStateDataPBImpl extends
private Container masterContainer = null;
private ByteBuffer appAttemptTokens = null;
private Map<String, Long> resourceSecondsMap;
private Map<String, Long> preemptedResourceSecondsMap;
public ApplicationAttemptStateDataPBImpl() {
builder = ApplicationAttemptStateDataProto.newBuilder();
}
@ -404,4 +408,50 @@ private static Credentials convertCredentialsFromByteBuffer(
IOUtils.closeStream(dibb);
}
}
@Override
public Map<String, Long> getResourceSecondsMap() {
if (this.resourceSecondsMap != null) {
return this.resourceSecondsMap;
}
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
this.resourceSecondsMap = ProtoUtils.convertStringLongMapProtoListToMap(
p.getApplicationResourceUsageMapList());
return this.resourceSecondsMap;
}
@Override
public void setResourceSecondsMap(Map<String, Long> resourceSecondsMap) {
maybeInitBuilder();
builder.clearApplicationResourceUsageMap();
this.resourceSecondsMap = resourceSecondsMap;
if (resourceSecondsMap != null) {
builder.addAllApplicationResourceUsageMap(
ProtoUtils.convertMapToStringLongMapProtoList(resourceSecondsMap));
}
}
@Override
public Map<String, Long> getPreemptedResourceSecondsMap() {
if (this.preemptedResourceSecondsMap != null) {
return this.preemptedResourceSecondsMap;
}
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
this.preemptedResourceSecondsMap = ProtoUtils
.convertStringLongMapProtoListToMap(
p.getApplicationResourceUsageMapList());
return this.preemptedResourceSecondsMap;
}
@Override
public void setPreemptedResourceSecondsMap(
Map<String, Long> preemptedResourceSecondsMap) {
maybeInitBuilder();
builder.clearPreemptedResourceUsageMap();
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
if (preemptedResourceSecondsMap != null) {
builder.addAllPreemptedResourceUsageMap(ProtoUtils
.convertMapToStringLongMapProtoList(preemptedResourceSecondsMap));
}
}
}

View File

@ -748,14 +748,10 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
}
RMAppMetrics rmAppMetrics = getRMAppMetrics();
appUsageReport.setMemorySeconds(rmAppMetrics.getMemorySeconds());
appUsageReport.setVcoreSeconds(rmAppMetrics.getVcoreSeconds());
appUsageReport.
setPreemptedMemorySeconds(rmAppMetrics.
getPreemptedMemorySeconds());
appUsageReport.
setPreemptedVcoreSeconds(rmAppMetrics.
getPreemptedVcoreSeconds());
appUsageReport
.setResourceSecondsMap(rmAppMetrics.getResourceSecondsMap());
appUsageReport.setPreemptedResourceSecondsMap(
rmAppMetrics.getPreemptedResourceSecondsMap());
}
if (currentApplicationAttemptId == null) {
@ -1630,10 +1626,9 @@ public RMAppMetrics getRMAppMetrics() {
Resource resourcePreempted = Resource.newInstance(0, 0);
int numAMContainerPreempted = 0;
int numNonAMContainerPreempted = 0;
long memorySeconds = 0;
long vcoreSeconds = 0;
long preemptedMemorySeconds = 0;
long preemptedVcoreSeconds = 0;
Map<String, Long> resourceSecondsMap = new HashMap<>();
Map<String, Long> preemptedSecondsMap = new HashMap<>();
for (RMAppAttempt attempt : attempts.values()) {
if (null != attempt) {
RMAppAttemptMetrics attemptMetrics =
@ -1647,17 +1642,25 @@ public RMAppMetrics getRMAppMetrics() {
// for both running and finished containers.
AggregateAppResourceUsage resUsage =
attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
memorySeconds += resUsage.getMemorySeconds();
vcoreSeconds += resUsage.getVcoreSeconds();
preemptedMemorySeconds += attemptMetrics.getPreemptedMemory();
preemptedVcoreSeconds += attemptMetrics.getPreemptedVcore();
for (Map.Entry<String, Long> entry : resUsage
.getResourceUsageSecondsMap().entrySet()) {
long value = RMServerUtils
.getOrDefault(resourceSecondsMap, entry.getKey(), 0L);
value += entry.getValue();
resourceSecondsMap.put(entry.getKey(), value);
}
for (Map.Entry<String, Long> entry : attemptMetrics
.getPreemptedResourceSecondsMap().entrySet()) {
long value = RMServerUtils
.getOrDefault(preemptedSecondsMap, entry.getKey(), 0L);
value += entry.getValue();
preemptedSecondsMap.put(entry.getKey(), value);
}
}
}
return new RMAppMetrics(resourcePreempted,
numNonAMContainerPreempted, numAMContainerPreempted,
memorySeconds, vcoreSeconds,
preemptedMemorySeconds, preemptedVcoreSeconds);
return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted,
numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap);
}
@Private

View File

@ -19,27 +19,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import java.util.Map;
public class RMAppMetrics {
final Resource resourcePreempted;
final int numNonAMContainersPreempted;
final int numAMContainersPreempted;
final long memorySeconds;
final long vcoreSeconds;
private final long preemptedMemorySeconds;
private final long preemptedVcoreSeconds;
private final Map<String, Long> resourceSecondsMap;
private final Map<String, Long> preemptedResourceSecondsMap;
public RMAppMetrics(Resource resourcePreempted,
int numNonAMContainersPreempted, int numAMContainersPreempted,
long memorySeconds, long vcoreSeconds, long preemptedMemorySeconds,
long preemptedVcoreSeconds) {
Map<String, Long> resourceSecondsMap,
Map<String, Long> preemptedResourceSecondsMap) {
this.resourcePreempted = resourcePreempted;
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
this.numAMContainersPreempted = numAMContainersPreempted;
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
this.preemptedMemorySeconds = preemptedMemorySeconds;
this.preemptedVcoreSeconds = preemptedVcoreSeconds;
this.resourceSecondsMap = resourceSecondsMap;
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
}
public Resource getResourcePreempted() {
@ -55,19 +55,32 @@ public int getNumAMContainersPreempted() {
}
public long getMemorySeconds() {
return memorySeconds;
return RMServerUtils.getOrDefault(resourceSecondsMap,
ResourceInformation.MEMORY_MB.getName(), 0L);
}
public long getVcoreSeconds() {
return vcoreSeconds;
return RMServerUtils
.getOrDefault(resourceSecondsMap, ResourceInformation.VCORES.getName(),
0L);
}
public long getPreemptedMemorySeconds() {
return preemptedMemorySeconds;
return RMServerUtils.getOrDefault(preemptedResourceSecondsMap,
ResourceInformation.MEMORY_MB.getName(), 0L);
}
public long getPreemptedVcoreSeconds() {
return preemptedVcoreSeconds;
return RMServerUtils.getOrDefault(preemptedResourceSecondsMap,
ResourceInformation.VCORES.getName(), 0L);
}
public Map<String, Long> getResourceSecondsMap() {
return resourceSecondsMap;
}
public Map<String, Long> getPreemptedResourceSecondsMap() {
return preemptedResourceSecondsMap;
}
}

View File

@ -19,42 +19,38 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import java.util.HashMap;
import java.util.Map;
@Private
public class AggregateAppResourceUsage {
long memorySeconds;
long vcoreSeconds;
private Map<String, Long> resourceSecondsMap = new HashMap<>();
public AggregateAppResourceUsage(long memorySeconds, long vcoreSeconds) {
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
public AggregateAppResourceUsage(Map<String, Long> resourceSecondsMap) {
this.resourceSecondsMap.putAll(resourceSecondsMap);
}
/**
* @return the memorySeconds
*/
public long getMemorySeconds() {
return memorySeconds;
}
/**
* @param memorySeconds the memorySeconds to set
*/
public void setMemorySeconds(long memorySeconds) {
this.memorySeconds = memorySeconds;
return RMServerUtils.getOrDefault(resourceSecondsMap,
ResourceInformation.MEMORY_MB.getName(), 0L);
}
/**
* @return the vcoreSeconds
*/
public long getVcoreSeconds() {
return vcoreSeconds;
return RMServerUtils
.getOrDefault(resourceSecondsMap, ResourceInformation.VCORES.getName(),
0L);
}
/**
* @param vcoreSeconds the vcoreSeconds to set
*/
public void setVcoreSeconds(long vcoreSeconds) {
this.vcoreSeconds = vcoreSeconds;
public Map<String, Long> getResourceUsageSecondsMap() {
return resourceSecondsMap;
}
}

View File

@ -953,12 +953,9 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
}
AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage();
report.setMemorySeconds(resUsage.getMemorySeconds());
report.setVcoreSeconds(resUsage.getVcoreSeconds());
report.setPreemptedMemorySeconds(
this.attemptMetrics.getPreemptedMemory());
report.setPreemptedVcoreSeconds(
this.attemptMetrics.getPreemptedVcore());
report.setResourceSecondsMap(resUsage.getResourceUsageSecondsMap());
report.setPreemptedResourceSecondsMap(
this.attemptMetrics.getPreemptedResourceSecondsMap());
return report;
} finally {
this.readLock.unlock();
@ -995,11 +992,10 @@ public void recover(RMState state) {
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
this.finishTime = attemptState.getFinishTime();
this.attemptMetrics.updateAggregateAppResourceUsage(
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
this.attemptMetrics
.updateAggregateAppResourceUsage(attemptState.getResourceSecondsMap());
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
attemptState.getPreemptedMemorySeconds(),
attemptState.getPreemptedVcoreSeconds());
attemptState.getPreemptedResourceSecondsMap());
}
public void transferStateFromAttempt(RMAppAttempt attempt) {
@ -1375,16 +1371,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
RMStateStore rmStore = rmContext.getStateStore();
setFinishTime(System.currentTimeMillis());
ApplicationAttemptStateData attemptState =
ApplicationAttemptStateData.newInstance(
applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this),
startTime, stateToBeStored, finalTrackingUrl, diags.toString(),
finalStatus, exitStatus,
getFinishTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds(),
this.attemptMetrics.getPreemptedMemory(),
this.attemptMetrics.getPreemptedVcore());
ApplicationAttemptStateData attemptState = ApplicationAttemptStateData
.newInstance(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags.toString(), finalStatus, exitStatus,
getFinishTime(), resUsage.getResourceUsageSecondsMap(),
this.attemptMetrics.getPreemptedResourceSecondsMap());
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -25,11 +27,13 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -49,10 +53,8 @@ public class RMAppAttemptMetrics {
private ReadLock readLock;
private WriteLock writeLock;
private AtomicLong finishedMemorySeconds = new AtomicLong(0);
private AtomicLong finishedVcoreSeconds = new AtomicLong(0);
private AtomicLong preemptedMemorySeconds = new AtomicLong(0);
private AtomicLong preemptedVcoreSeconds = new AtomicLong(0);
private Map<String, AtomicLong> resourceUsageMap = new HashMap<>();
private Map<String, AtomicLong> preemptedResourceMap = new HashMap<>();
private RMContext rmContext;
private int[][] localityStatistics =
@ -102,11 +104,16 @@ public Resource getResourcePreempted() {
}
public long getPreemptedMemory() {
return preemptedMemorySeconds.get();
return preemptedResourceMap.get(ResourceInformation.MEMORY_MB.getName())
.get();
}
public long getPreemptedVcore() {
return preemptedVcoreSeconds.get();
return preemptedResourceMap.get(ResourceInformation.VCORES.getName()).get();
}
public Map<String, Long> getPreemptedResourceSecondsMap() {
return convertAtomicLongMaptoLongMap(preemptedResourceMap);
}
public int getNumNonAMContainersPreempted() {
@ -122,35 +129,90 @@ public boolean getIsPreempted() {
}
public AggregateAppResourceUsage getAggregateAppResourceUsage() {
long memorySeconds = finishedMemorySeconds.get();
long vcoreSeconds = finishedVcoreSeconds.get();
Map<String, Long> resourcesUsed =
convertAtomicLongMaptoLongMap(resourceUsageMap);
// Only add in the running containers if this is the active attempt.
RMApp rmApp = rmContext.getRMApps().get(attemptId.getApplicationId());
if (null != rmApp) {
RMAppAttempt currentAttempt = rmApp.getCurrentAppAttempt();
if (rmApp != null) {
RMAppAttempt currentAttempt = rmContext.getRMApps().get(attemptId.getApplicationId()).getCurrentAppAttempt();
if (currentAttempt.getAppAttemptId().equals(attemptId)) {
ApplicationResourceUsageReport appResUsageReport = rmContext
.getScheduler().getAppResourceUsageReport(attemptId);
ApplicationResourceUsageReport appResUsageReport =
rmContext.getScheduler().getAppResourceUsageReport(attemptId);
if (appResUsageReport != null) {
memorySeconds += appResUsageReport.getMemorySeconds();
vcoreSeconds += appResUsageReport.getVcoreSeconds();
Map<String, Long> tmp = appResUsageReport.getResourceSecondsMap();
for (Map.Entry<String, Long> entry : tmp.entrySet()) {
if (resourcesUsed.containsKey(entry.getKey())) {
Long value = resourcesUsed.get(entry.getKey());
value += entry.getValue();
resourcesUsed.put(entry.getKey(), value);
} else{
resourcesUsed.put(entry.getKey(), entry.getValue());
}
}
}
}
}
return new AggregateAppResourceUsage(memorySeconds, vcoreSeconds);
return new AggregateAppResourceUsage(resourcesUsed);
}
public void updateAggregateAppResourceUsage(long finishedMemorySeconds,
long finishedVcoreSeconds) {
this.finishedMemorySeconds.addAndGet(finishedMemorySeconds);
this.finishedVcoreSeconds.addAndGet(finishedVcoreSeconds);
public void updateAggregateAppResourceUsage(Resource allocated,
long deltaUsedMillis) {
updateUsageMap(allocated, deltaUsedMillis, resourceUsageMap);
}
public void updateAggregatePreemptedAppResourceUsage(Resource allocated,
long deltaUsedMillis) {
updateUsageMap(allocated, deltaUsedMillis, preemptedResourceMap);
}
public void updateAggregateAppResourceUsage(
Map<String, Long> resourceSecondsMap) {
updateUsageMap(resourceSecondsMap, resourceUsageMap);
}
public void updateAggregatePreemptedAppResourceUsage(
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
this.preemptedMemorySeconds.addAndGet(preemptedMemorySeconds);
this.preemptedVcoreSeconds.addAndGet(preemptedVcoreSeconds);
Map<String, Long> preemptedResourceSecondsMap) {
updateUsageMap(preemptedResourceSecondsMap, preemptedResourceMap);
}
private void updateUsageMap(Resource allocated, long deltaUsedMillis,
Map<String, AtomicLong> targetMap) {
for (Map.Entry<String, ResourceInformation> entry : allocated.getResources()
.entrySet()) {
AtomicLong resourceUsed;
if (!targetMap.containsKey(entry.getKey())) {
resourceUsed = new AtomicLong(0);
targetMap.put(entry.getKey(), resourceUsed);
}
resourceUsed = targetMap.get(entry.getKey());
resourceUsed.addAndGet((entry.getValue().getValue() * deltaUsedMillis)
/ DateUtils.MILLIS_PER_SECOND);
}
}
private void updateUsageMap(Map<String, Long> sourceMap,
Map<String, AtomicLong> targetMap) {
for (Map.Entry<String, Long> entry : sourceMap.entrySet()) {
AtomicLong resourceUsed;
if (!targetMap.containsKey(entry.getKey())) {
resourceUsed = new AtomicLong(0);
targetMap.put(entry.getKey(), resourceUsed);
}
resourceUsed = targetMap.get(entry.getKey());
resourceUsed.set(entry.getValue());
}
}
private Map<String, Long> convertAtomicLongMaptoLongMap(
Map<String, AtomicLong> source) {
Map<String, Long> ret = new HashMap<>();
for (Map.Entry<String, AtomicLong> entry : source.entrySet()) {
ret.put(entry.getKey(), entry.getValue().get());
}
return ret;
}
public void incNumAllocatedContainers(NodeType containerType,

View File

@ -25,7 +25,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -715,20 +714,15 @@ private static void updateAttemptMetrics(RMContainerImpl container) {
if (rmAttempt != null) {
long usedMillis = container.finishTime - container.creationTime;
long memorySeconds = resource.getMemorySize()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
rmAttempt.getRMAppAttemptMetrics()
.updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds);
.updateAggregateAppResourceUsage(resource, usedMillis);
// If this is a preempted container, update preemption metrics
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
.getExitStatus()) {
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
container);
.getExitStatus()) {
rmAttempt.getRMAppAttemptMetrics()
.updateAggregatePreemptedAppResourceUsage(memorySeconds,
vcoreSeconds);
.updatePreemptionInfo(resource, container);
rmAttempt.getRMAppAttemptMetrics()
.updateAggregatePreemptedAppResourceUsage(resource, usedMillis);
}
}
}

View File

@ -55,11 +55,13 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
@ -107,9 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
protected long lastMemoryAggregateAllocationUpdateTime = 0;
private long lastMemorySeconds = 0;
private long lastVcoreSeconds = 0;
private Map<String, Long> lastResourceSecondsMap = new HashMap<>();
protected final AppSchedulingInfo appSchedulingInfo;
protected ApplicationAttemptId attemptId;
protected Map<ContainerId, RMContainer> liveContainers =
@ -1002,22 +1002,24 @@ private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
// recently.
if ((currentTimeMillis - lastMemoryAggregateAllocationUpdateTime)
> MEM_AGGREGATE_ALLOCATION_CACHE_MSECS) {
long memorySeconds = 0;
long vcoreSeconds = 0;
Map<String, Long> resourceSecondsMap = new HashMap<>();
for (RMContainer rmContainer : this.liveContainers.values()) {
long usedMillis = currentTimeMillis - rmContainer.getCreationTime();
Resource resource = rmContainer.getContainer().getResource();
memorySeconds += resource.getMemorySize() * usedMillis /
DateUtils.MILLIS_PER_SECOND;
vcoreSeconds += resource.getVirtualCores() * usedMillis
/ DateUtils.MILLIS_PER_SECOND;
for (Map.Entry<String, ResourceInformation> entry : resource
.getResources().entrySet()) {
long value = RMServerUtils
.getOrDefault(resourceSecondsMap, entry.getKey(), 0L);
value += entry.getValue().getValue() * usedMillis
/ DateUtils.MILLIS_PER_SECOND;
resourceSecondsMap.put(entry.getKey(), value);
}
}
lastMemoryAggregateAllocationUpdateTime = currentTimeMillis;
lastMemorySeconds = memorySeconds;
lastVcoreSeconds = vcoreSeconds;
lastResourceSecondsMap = resourceSecondsMap;
}
return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds);
return new AggregateAppResourceUsage(lastResourceSecondsMap);
}
public ApplicationResourceUsageReport getResourceUsageReport() {
@ -1032,6 +1034,11 @@ public ApplicationResourceUsageReport getResourceUsageReport() {
Resource cluster = rmContext.getScheduler().getClusterResource();
ResourceCalculator calc =
rmContext.getScheduler().getResourceCalculator();
Map<String, Long> preemptedResourceSecondsMaps = new HashMap<>();
preemptedResourceSecondsMaps
.put(ResourceInformation.MEMORY_MB.getName(), 0L);
preemptedResourceSecondsMaps
.put(ResourceInformation.VCORES.getName(), 0L);
float queueUsagePerc = 0.0f;
float clusterUsagePerc = 0.0f;
if (!calc.isInvalidDivisor(cluster)) {
@ -1041,15 +1048,15 @@ public ApplicationResourceUsageReport getResourceUsageReport() {
queueUsagePerc = calc.divide(cluster, usedResourceClone,
Resources.multiply(cluster, queueCapacityPerc)) * 100;
}
clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster)
* 100;
clusterUsagePerc =
calc.divide(cluster, usedResourceClone, cluster) * 100;
}
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
reservedContainers.size(), usedResourceClone, reservedResourceClone,
Resources.add(usedResourceClone, reservedResourceClone),
runningResourceUsage.getMemorySeconds(),
runningResourceUsage.getVcoreSeconds(), queueUsagePerc,
clusterUsagePerc, 0, 0);
return ApplicationResourceUsageReport
.newInstance(liveContainers.size(), reservedContainers.size(),
usedResourceClone, reservedResourceClone,
Resources.add(usedResourceClone, reservedResourceClone),
runningResourceUsage.getResourceUsageSecondsMap(), queueUsagePerc,
clusterUsagePerc, preemptedResourceSecondsMaps);
} finally {
writeLock.unlock();
}

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.AppBlock;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.DIV;
@ -106,15 +107,12 @@ protected void createApplicationMetricsTable(Block html){
attemptResourcePreempted)
.__("Number of Non-AM Containers Preempted from Current Attempt:",
attemptNumNonAMContainerPreempted)
.__("Aggregate Resource Allocation:",
String.format("%d MB-seconds, %d vcore-seconds",
appMetrics == null ? "N/A" : appMetrics.getMemorySeconds(),
appMetrics == null ? "N/A" : appMetrics.getVcoreSeconds()))
.__("Aggregate Resource Allocation:", appMetrics == null ? "N/A" :
StringHelper
.getResourceSecondsString(appMetrics.getResourceSecondsMap()))
.__("Aggregate Preempted Resource Allocation:",
String.format("%d MB-seconds, %d vcore-seconds",
appMetrics == null ? "N/A" : appMetrics.getPreemptedMemorySeconds(),
appMetrics == null ? "N/A" :
appMetrics.getPreemptedVcoreSeconds()));
appMetrics == null ? "N/A" : StringHelper.getResourceSecondsString(
appMetrics.getPreemptedResourceSecondsMap()));
pdiv.__();
}

View File

@ -101,6 +101,7 @@ public class AppInfo {
private long vcoreSeconds;
protected float queueUsagePercentage;
protected float clusterUsagePercentage;
protected Map<String, Long> resourceSecondsMap;
// preemption info fields
private long preemptedResourceMB;
@ -109,6 +110,7 @@ public class AppInfo {
private int numAMContainerPreempted;
private long preemptedMemorySeconds;
private long preemptedVcoreSeconds;
protected Map<String, Long> preemptedResourceSecondsMap;
// list of resource requests
@XmlElement(name = "resourceRequests")
@ -236,8 +238,10 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
appMetrics.getResourcePreempted().getVirtualCores();
memorySeconds = appMetrics.getMemorySeconds();
vcoreSeconds = appMetrics.getVcoreSeconds();
resourceSecondsMap = appMetrics.getResourceSecondsMap();
preemptedMemorySeconds = appMetrics.getPreemptedMemorySeconds();
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
preemptedResourceSecondsMap = appMetrics.getPreemptedResourceSecondsMap();
ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext();
unmanagedApplication = appSubmissionContext.getUnmanagedAM();
@ -415,6 +419,22 @@ public long getReservedVCores() {
return this.reservedVCores;
}
public long getPreemptedMB() {
return preemptedResourceMB;
}
public long getPreemptedVCores() {
return preemptedResourceVCores;
}
public int getNumNonAMContainersPreempted() {
return numNonAMContainerPreempted;
}
public int getNumAMContainersPreempted() {
return numAMContainerPreempted;
}
public long getMemorySeconds() {
return memorySeconds;
}
@ -423,6 +443,10 @@ public long getVcoreSeconds() {
return vcoreSeconds;
}
public Map<String, Long> getResourceSecondsMap() {
return resourceSecondsMap;
}
public long getPreemptedMemorySeconds() {
return preemptedMemorySeconds;
}
@ -431,6 +455,10 @@ public long getPreemptedVcoreSeconds() {
return preemptedVcoreSeconds;
}
public Map<String, Long> getPreemptedResourceSecondsMap() {
return preemptedResourceSecondsMap;
}
public List<ResourceRequestInfo> getResourceRequests() {
return this.resourceRequests;
}

View File

@ -20,46 +20,68 @@
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlAccessorType(XmlAccessType.NONE)
public class ResourceInfo {
@XmlElement
long memory;
@XmlElement
int vCores;
private Resource resources;
public ResourceInfo() {
}
public ResourceInfo(Resource res) {
memory = res.getMemorySize();
vCores = res.getVirtualCores();
resources = Resources.clone(res);
}
public long getMemorySize() {
return memory;
if (resources == null) {
resources = Resource.newInstance(memory, vCores);
}
return resources.getMemorySize();
}
public int getvCores() {
return vCores;
if (resources == null) {
resources = Resource.newInstance(memory, vCores);
}
return resources.getVirtualCores();
}
@Override
public String toString() {
return "<memory:" + memory + ", vCores:" + vCores + ">";
return resources.toString();
}
public void setMemory(int memory) {
if (resources == null) {
resources = Resource.newInstance(memory, vCores);
}
this.memory = memory;
resources.setMemorySize(memory);
}
public void setvCores(int vCores) {
if (resources == null) {
resources = Resource.newInstance(memory, vCores);
}
this.vCores = vCores;
resources.setVirtualCores(vCores);
}
public Resource getResource() {
return Resource.newInstance(memory, vCores);
return Resource.newInstance(resources);
}
}

View File

@ -73,7 +73,7 @@ public ResourceInfo getMaxAllocation() {
}
public String getSchedulerResourceTypes() {
return this.schedulingResourceTypes.toString();
return minAllocResource.getResource().getResources().keySet().toString();
}
public int getMaxClusterLevelAppPriority() {

View File

@ -87,6 +87,8 @@ message ApplicationAttemptStateDataProto {
optional int64 finish_time = 12;
optional int64 preempted_memory_seconds = 13;
optional int64 preempted_vcore_seconds = 14;
repeated StringLongMapProto application_resource_usage_map = 15;
repeated StringLongMapProto preempted_resource_usage_map = 16;
}
message EpochProto {

View File

@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@ -56,6 +57,7 @@
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -828,9 +830,12 @@ public void testEscapeApplicationSummary() {
when(app.getState()).thenReturn(RMAppState.RUNNING);
when(app.getApplicationType()).thenReturn("MAPREDUCE");
when(app.getSubmitTime()).thenReturn(1000L);
Map<String, Long> resourceSecondsMap = new HashMap<>();
resourceSecondsMap.put(ResourceInformation.MEMORY_MB.getName(), 16384L);
resourceSecondsMap.put(ResourceInformation.VCORES.getName(), 64L);
RMAppMetrics metrics =
new RMAppMetrics(Resource.newInstance(1234, 56),
10, 1, 16384, 64, 0, 0);
10, 1, resourceSecondsMap, new HashMap<>());
when(app.getRMAppMetrics()).thenReturn(metrics);
RMAppManager.ApplicationSummary.SummaryBuilder summary =

View File

@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -32,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@ -422,6 +424,9 @@ private AggregateAppResourceUsage calculateContainerResourceMetrics(
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
return new AggregateAppResourceUsage(memorySeconds, vcoreSeconds);
Map<String, Long> map = new HashMap<>();
map.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds);
map.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
return new AggregateAppResourceUsage(map);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -189,7 +190,8 @@ public Set<NodeId> getRanNodes() {
@Override
public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<>(),
new HashMap<>());
}
@Override
@ -337,8 +339,9 @@ public Set<String> getApplicationTags() {
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
ApplicationResourceUsageReport usageReport =
ApplicationResourceUsageReport.newInstance(0, 0, null, null, null,
0, 0, 0, 0, 0, 0);
ApplicationResourceUsageReport
.newInstance(0, 0, null, null, null, new HashMap<>(), 0, 0,
new HashMap<>());
ApplicationReport report = ApplicationReport.newInstance(
getApplicationId(), appAttemptId, getUser(), getQueue(),
getName(), null, 0, null, null, getDiagnostics().toString(),

View File

@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -40,6 +41,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -506,9 +508,16 @@ private static RMApp createRMApp(ApplicationId appId) {
when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
when(app.getRMAppMetrics()).thenReturn(
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
Integer.MAX_VALUE, Long.MAX_VALUE));
Map<String, Long> resourceMap = new HashMap<>();
resourceMap
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
resourceMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
Map<String, Long> preemptedMap = new HashMap<>();
preemptedMap
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
when(app.getRMAppMetrics())
.thenReturn(new RMAppMetrics(null, 0, 0, resourceMap, preemptedMap));
Set<String> appTags = new HashSet<String>();
appTags.add("test");
appTags.add("tags");

View File

@ -29,6 +29,8 @@
import java.io.FileReader;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -46,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@ -357,15 +360,20 @@ private static RMApp createRMApp(ApplicationId appId) {
when(app.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info"));
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(
ApplicationAttemptId.newInstance(appId, 1));
when(appAttempt.getAppAttemptId())
.thenReturn(ApplicationAttemptId.newInstance(appId, 1));
when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
when(app.getFinalApplicationStatus())
.thenReturn(FinalApplicationStatus.UNDEFINED);
Map<String, Long> resourceSecondsMap = new HashMap<>();
resourceSecondsMap
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
resourceSecondsMap
.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
when(app.getRMAppMetrics()).thenReturn(
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, Integer.MAX_VALUE,
Long.MAX_VALUE, 0, 0));
when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceSecondsMap,
new HashMap<>()));
when(app.getApplicationTags()).thenReturn(Collections.<String>emptySet());
ApplicationSubmissionContext appSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(appSubmissionContext.getPriority())

View File

@ -194,7 +194,7 @@ protected RMAppAttempt storeAttempt(RMStateStore store,
when(mockAttempt.getRMAppAttemptMetrics())
.thenReturn(mockRmAppAttemptMetrics);
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
.thenReturn(new AggregateAppResourceUsage(0, 0));
.thenReturn(new AggregateAppResourceUsage(new HashMap<>()));
dispatcher.attemptId = attemptId;
store.storeNewApplicationAttempt(mockAttempt);
waitNotify(dispatcher);
@ -292,7 +292,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
when(mockRemovedAttempt.getRMAppAttemptMetrics())
.thenReturn(mockRmAppAttemptMetrics);
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
.thenReturn(new AggregateAppResourceUsage(0,0));
.thenReturn(new AggregateAppResourceUsage(new HashMap<>()));
attempts.put(attemptIdRemoved, mockRemovedAttempt);
store.removeApplication(mockRemovedApp);
@ -369,7 +369,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100,
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>());
store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
@ -393,7 +393,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111,
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
oldAttemptState.getFinishTime(), new HashMap<>(), new HashMap<>());
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down

View File

@ -33,12 +33,7 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
@ -511,7 +506,7 @@ public void testFencedState() throws Exception {
when(mockAttempt.getRMAppAttemptMetrics())
.thenReturn(mockRmAppAttemptMetrics);
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
.thenReturn(new AggregateAppResourceUsage(0,0));
.thenReturn(new AggregateAppResourceUsage(new HashMap<>()));
store.storeNewApplicationAttempt(mockAttempt);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
@ -523,7 +518,7 @@ public void testFencedState() throws Exception {
store.getCredentialsFromAppAttempt(mockAttempt),
startTime, RMAppAttemptState.FINISHED, "testUrl",
"test", FinalApplicationStatus.SUCCEEDED, 100,
finishTime, 0, 0, 0, 0);
finishTime, new HashMap<>(), new HashMap<>());
store.updateApplicationAttemptState(newAttemptState);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
@ -751,10 +746,20 @@ private static ApplicationStateData createAppState(
private static ApplicationAttemptStateData createFinishedAttempt(
ApplicationAttemptId attemptId, Container container, long startTime,
int amExitStatus) {
Map<String, Long> resourceSecondsMap = new HashMap<>();
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
resourceSecondsMap
.put(ResourceInformation.MEMORY_MB.getName(), 0L);
resourceSecondsMap
.put(ResourceInformation.VCORES.getName(), 0L);
preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(),
0L);
preemptedResoureSecondsMap
.put(ResourceInformation.VCORES.getName(), 0L);
return ApplicationAttemptStateData.newInstance(attemptId,
container, null, startTime, RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
amExitStatus, 0, 0, 0, 0, 0);
amExitStatus, 0, resourceSecondsMap, preemptedResoureSecondsMap);
}
private ApplicationAttemptId storeAttempt(RMStateStore store,

View File

@ -22,6 +22,7 @@
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -62,9 +63,10 @@ public void testAppBlockRenderWithNullCurrentAppAttempt() throws Exception {
when(app.getStartTime()).thenReturn(0L);
when(app.getFinishTime()).thenReturn(0L);
when(app.createApplicationState()).thenReturn(YarnApplicationState.FAILED);
RMAppMetrics appMetrics = new RMAppMetrics(
Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
RMAppMetrics appMetrics =
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<>(),
new HashMap<>());
when(app.getRMAppMetrics()).thenReturn(appMetrics);
// initialize RM Context, and create RMApp, without creating RMAppAttempt

View File

@ -51,6 +51,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@ -136,8 +137,8 @@ private static RMContext mockRMContext(List<RMAppState> states) {
MockRMApp app = new MockRMApp(i, i, state) {
@Override
public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0),
0, 0, 0, 0, 0, 0);
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0,
new HashMap<>(), new HashMap<>());
}
@Override
public YarnApplicationState createApplicationState() {

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -59,6 +61,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
@ -303,6 +307,18 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
return clientRMProxy.updateApplicationTimeouts(request);
}
@Override
public GetAllResourceProfilesResponse getResourceProfiles(
GetAllResourceProfilesRequest request) throws YarnException, IOException {
return clientRMProxy.getResourceProfiles(request);
}
@Override
public GetResourceProfileResponse getResourceProfile(
GetResourceProfileRequest request) throws YarnException, IOException {
return clientRMProxy.getResourceProfile(request);
}
@VisibleForTesting
public void setRMClient(ApplicationClientProtocol clientRM) {
this.clientRMProxy = clientRM;

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -64,6 +66,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
@ -709,4 +713,15 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
throw new NotImplementedException();
}
@Override
public GetAllResourceProfilesResponse getResourceProfiles(
GetAllResourceProfilesRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetResourceProfileResponse getResourceProfile(
GetResourceProfileRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
}

View File

@ -38,6 +38,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -70,6 +72,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
@ -403,6 +407,20 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
return pipeline.getRootInterceptor().updateApplicationTimeouts(request);
}
@Override
public GetAllResourceProfilesResponse getResourceProfiles(
GetAllResourceProfilesRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getResourceProfiles(request);
}
@Override
public GetResourceProfileResponse getResourceProfile(
GetResourceProfileRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getResourceProfile(request);
}
private RequestInterceptorChainWrapper getInterceptorChain()
throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName();

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -56,6 +58,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
@ -264,4 +268,16 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
throws YarnException, IOException {
return getNextInterceptor().updateApplicationTimeouts(request);
}
@Override
public GetAllResourceProfilesResponse getResourceProfiles(
GetAllResourceProfilesRequest request) throws YarnException, IOException {
return getNextInterceptor().getResourceProfiles(request);
}
@Override
public GetResourceProfileResponse getResourceProfile(
GetResourceProfileRequest request) throws YarnException, IOException {
return getNextInterceptor().getResourceProfile(request);
}
}