YARN-9181. Backport YARN-6232 for generic resource type usage to branch-2
This commit is contained in:
parent
c5e77e8b84
commit
7f614f7cee
|
@ -24,6 +24,9 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Contains various scheduling metrics to be reported by UI and CLI.
|
||||
*/
|
||||
|
@ -35,9 +38,9 @@ public abstract class ApplicationResourceUsageReport {
|
|||
@Unstable
|
||||
public static ApplicationResourceUsageReport newInstance(
|
||||
int numUsedContainers, int numReservedContainers, Resource usedResources,
|
||||
Resource reservedResources, Resource neededResources, long memorySeconds,
|
||||
long vcoreSeconds, float queueUsagePerc, float clusterUsagePerc,
|
||||
long preemptedMemorySeconds, long preemptedVcoresSeconds) {
|
||||
Resource reservedResources, Resource neededResources,
|
||||
Map<String, Long> resourceSecondsMap, float queueUsagePerc,
|
||||
float clusterUsagePerc, Map<String, Long> preemtedResourceSecondsMap) {
|
||||
ApplicationResourceUsageReport report =
|
||||
Records.newRecord(ApplicationResourceUsageReport.class);
|
||||
report.setNumUsedContainers(numUsedContainers);
|
||||
|
@ -45,12 +48,10 @@ public abstract class ApplicationResourceUsageReport {
|
|||
report.setUsedResources(usedResources);
|
||||
report.setReservedResources(reservedResources);
|
||||
report.setNeededResources(neededResources);
|
||||
report.setMemorySeconds(memorySeconds);
|
||||
report.setVcoreSeconds(vcoreSeconds);
|
||||
report.setResourceSecondsMap(resourceSecondsMap);
|
||||
report.setQueueUsagePercentage(queueUsagePerc);
|
||||
report.setClusterUsagePercentage(clusterUsagePerc);
|
||||
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
report.setPreemptedVcoreSeconds(preemptedVcoresSeconds);
|
||||
report.setPreemptedResourceSecondsMap(preemtedResourceSecondsMap);
|
||||
return report;
|
||||
}
|
||||
|
||||
|
@ -229,4 +230,47 @@ public abstract class ApplicationResourceUsageReport {
|
|||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedVcoreSeconds();
|
||||
|
||||
/**
|
||||
* Get the aggregated number of resources that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @return map containing the resource name and aggregated resource-seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Map<String, Long> getResourceSecondsMap();
|
||||
|
||||
/**
|
||||
* Set the aggregated number of resources that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @param resourceSecondsMap map containing the resource name and aggregated
|
||||
* resource-seconds
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setResourceSecondsMap(
|
||||
Map<String, Long> resourceSecondsMap);
|
||||
|
||||
|
||||
/**
|
||||
* Get the aggregated number of resources preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @return map containing the resource name and aggregated preempted
|
||||
* resource-seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Map<String, Long> getPreemptedResourceSecondsMap();
|
||||
|
||||
/**
|
||||
* Set the aggregated number of resources preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @param preemptedResourceSecondsMap map containing the resource name and
|
||||
* aggregated preempted resource-seconds
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setPreemptedResourceSecondsMap(
|
||||
Map<String, Long> preemptedResourceSecondsMap);
|
||||
|
||||
}
|
||||
|
|
|
@ -217,6 +217,11 @@ message LocalResourceProto {
|
|||
optional bool should_be_uploaded_to_shared_cache = 7;
|
||||
}
|
||||
|
||||
message StringLongMapProto {
|
||||
required string key = 1;
|
||||
required int64 value = 2;
|
||||
}
|
||||
|
||||
message ApplicationResourceUsageReportProto {
|
||||
optional int32 num_used_containers = 1;
|
||||
optional int32 num_reserved_containers = 2;
|
||||
|
@ -229,6 +234,8 @@ message ApplicationResourceUsageReportProto {
|
|||
optional float cluster_usage_percentage = 9;
|
||||
optional int64 preempted_memory_seconds = 10;
|
||||
optional int64 preempted_vcore_seconds = 11;
|
||||
repeated StringLongMapProto application_resource_usage_map = 12;
|
||||
repeated StringLongMapProto application_preempted_resource_usage_map = 13;
|
||||
}
|
||||
|
||||
message ApplicationReportProto {
|
||||
|
|
|
@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.util.Times;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import static org.apache.hadoop.yarn.util.StringHelper.getResourceSecondsString;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class ApplicationCLI extends YarnCLI {
|
||||
|
@ -711,24 +713,9 @@ public class ApplicationCLI extends YarnCLI {
|
|||
appReportStr.println(appReport.getRpcPort());
|
||||
appReportStr.print("\tAM Host : ");
|
||||
appReportStr.println(appReport.getHost());
|
||||
appReportStr.print("\tAggregate Resource Allocation : ");
|
||||
|
||||
ApplicationResourceUsageReport usageReport =
|
||||
appReport.getApplicationResourceUsageReport();
|
||||
if (usageReport != null) {
|
||||
//completed app report in the timeline server doesn't have usage report
|
||||
appReportStr.print(usageReport.getMemorySeconds() + " MB-seconds, ");
|
||||
appReportStr.println(usageReport.getVcoreSeconds() + " vcore-seconds");
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.print(usageReport.getPreemptedMemorySeconds() +
|
||||
" MB-seconds, ");
|
||||
appReportStr.println(usageReport.getPreemptedVcoreSeconds() +
|
||||
" vcore-seconds");
|
||||
} else {
|
||||
appReportStr.println("N/A");
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.println("N/A");
|
||||
}
|
||||
printResourceUsage(appReportStr, usageReport);
|
||||
appReportStr.print("\tLog Aggregation Status : ");
|
||||
appReportStr.println(appReport.getLogAggregationStatus() == null ? "N/A"
|
||||
: appReport.getLogAggregationStatus());
|
||||
|
@ -759,6 +746,22 @@ public class ApplicationCLI extends YarnCLI {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private void printResourceUsage(PrintWriter appReportStr,
|
||||
ApplicationResourceUsageReport usageReport) {
|
||||
appReportStr.print("\tAggregate Resource Allocation : ");
|
||||
if (usageReport != null) {
|
||||
appReportStr.println(
|
||||
getResourceSecondsString(usageReport.getResourceSecondsMap()));
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.println(getResourceSecondsString(
|
||||
usageReport.getPreemptedResourceSecondsMap()));
|
||||
} else {
|
||||
appReportStr.println("N/A");
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.println("N/A");
|
||||
}
|
||||
}
|
||||
|
||||
private String getAllValidApplicationStates() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("The valid application state can be" + " one of the following: ");
|
||||
|
|
|
@ -39,8 +39,10 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -69,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
|
@ -118,9 +121,18 @@ public class TestYarnCLI {
|
|||
for (int i = 0; i < 2; ++i) {
|
||||
ApplicationCLI cli = createAndGetAppCLI();
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
|
||||
resourceSecondsMap.put(ResourceInformation.MEMORY_MB.getName(), 123456L);
|
||||
resourceSecondsMap.put(ResourceInformation.VCORES.getName(), 4567L);
|
||||
preemptedResoureSecondsMap
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), 1111L);
|
||||
preemptedResoureSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), 2222L);
|
||||
ApplicationResourceUsageReport usageReport = i == 0 ? null :
|
||||
ApplicationResourceUsageReport.newInstance(
|
||||
2, 0, null, null, null, 123456, 4567, 0, 0, 1111, 2222);
|
||||
ApplicationResourceUsageReport
|
||||
.newInstance(2, 0, null, null, null, resourceSecondsMap, 0, 0,
|
||||
preemptedResoureSecondsMap);
|
||||
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
|
||||
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
|
|
|
@ -22,12 +22,16 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class ApplicationResourceUsageReportPBImpl
|
||||
|
@ -41,6 +45,9 @@ extends ApplicationResourceUsageReport {
|
|||
Resource reservedResources;
|
||||
Resource neededResources;
|
||||
|
||||
private Map<String, Long> resourceSecondsMap;
|
||||
private Map<String, Long> preemptedResourceSecondsMap;
|
||||
|
||||
public ApplicationResourceUsageReportPBImpl() {
|
||||
builder = ApplicationResourceUsageReportProto.newBuilder();
|
||||
}
|
||||
|
@ -49,6 +56,8 @@ extends ApplicationResourceUsageReport {
|
|||
ApplicationResourceUsageReportProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
getResourceSecondsMap();
|
||||
getPreemptedResourceSecondsMap();
|
||||
}
|
||||
|
||||
public synchronized ApplicationResourceUsageReportProto getProto() {
|
||||
|
@ -89,6 +98,23 @@ extends ApplicationResourceUsageReport {
|
|||
if (this.neededResources != null) {
|
||||
builder.setNeededResources(convertToProtoFormat(this.neededResources));
|
||||
}
|
||||
builder.clearApplicationResourceUsageMap();
|
||||
builder.clearApplicationPreemptedResourceUsageMap();
|
||||
|
||||
if (preemptedResourceSecondsMap != null && !preemptedResourceSecondsMap
|
||||
.isEmpty()) {
|
||||
builder.addAllApplicationPreemptedResourceUsageMap(ProtoUtils
|
||||
.convertMapToStringLongMapProtoList(preemptedResourceSecondsMap));
|
||||
}
|
||||
if (resourceSecondsMap != null && !resourceSecondsMap.isEmpty()) {
|
||||
builder.addAllApplicationResourceUsageMap(
|
||||
ProtoUtils.convertMapToStringLongMapProtoList(resourceSecondsMap));
|
||||
}
|
||||
|
||||
builder.setMemorySeconds(this.getMemorySeconds());
|
||||
builder.setVcoreSeconds(this.getVcoreSeconds());
|
||||
builder.setPreemptedMemorySeconds(this.getPreemptedMemorySeconds());
|
||||
builder.setPreemptedVcoreSeconds(this.getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -196,54 +222,64 @@ extends ApplicationResourceUsageReport {
|
|||
|
||||
@Override
|
||||
public synchronized void setMemorySeconds(long memory_seconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setMemorySeconds(memory_seconds);
|
||||
getResourceSecondsMap()
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), memory_seconds);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized long getMemorySeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getMemorySeconds();
|
||||
Map<String, Long> tmp = getResourceSecondsMap();
|
||||
if (tmp.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
return tmp.get(ResourceInformation.MEMORY_MB.getName());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setVcoreSeconds(long vcore_seconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setVcoreSeconds(vcore_seconds);
|
||||
getResourceSecondsMap()
|
||||
.put(ResourceInformation.VCORES.getName(), vcore_seconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getVcoreSeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.getVcoreSeconds());
|
||||
Map<String, Long> tmp = getResourceSecondsMap();
|
||||
if (tmp.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
return tmp.get(ResourceInformation.VCORES.getName());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPreemptedMemorySeconds(
|
||||
long preemptedMemorySeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
getPreemptedResourceSecondsMap()
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), preemptedMemorySeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getPreemptedMemorySeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getPreemptedMemorySeconds();
|
||||
Map<String, Long> tmp = getPreemptedResourceSecondsMap();
|
||||
if (tmp.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
return tmp.get(ResourceInformation.MEMORY_MB.getName());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPreemptedVcoreSeconds(
|
||||
long vcoreSeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedVcoreSeconds(vcoreSeconds);
|
||||
getPreemptedResourceSecondsMap()
|
||||
.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getPreemptedVcoreSeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return (p.getPreemptedVcoreSeconds());
|
||||
Map<String, Long> tmp = getPreemptedResourceSecondsMap();
|
||||
if (tmp.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
return tmp.get(ResourceInformation.VCORES.getName());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
|
@ -277,4 +313,81 @@ extends ApplicationResourceUsageReport {
|
|||
maybeInitBuilder();
|
||||
builder.setClusterUsagePercentage((clusterUsagePerc));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setResourceSecondsMap(
|
||||
Map<String, Long> resourceSecondsMap) {
|
||||
this.resourceSecondsMap = resourceSecondsMap;
|
||||
if (resourceSecondsMap == null) {
|
||||
return;
|
||||
}
|
||||
if (!resourceSecondsMap
|
||||
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
this.setMemorySeconds(0L);
|
||||
}
|
||||
if (!resourceSecondsMap.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
this.setVcoreSeconds(0L);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Map<String, Long> getResourceSecondsMap() {
|
||||
if (this.resourceSecondsMap != null) {
|
||||
return this.resourceSecondsMap;
|
||||
}
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
this.resourceSecondsMap = ProtoUtils
|
||||
.convertStringLongMapProtoListToMap(
|
||||
p.getApplicationResourceUsageMapList());
|
||||
if (!this.resourceSecondsMap
|
||||
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
this.setMemorySeconds(p.getMemorySeconds());
|
||||
}
|
||||
if (!this.resourceSecondsMap
|
||||
.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
this.setVcoreSeconds(p.getVcoreSeconds());
|
||||
}
|
||||
this.setMemorySeconds(p.getMemorySeconds());
|
||||
this.setVcoreSeconds(p.getVcoreSeconds());
|
||||
return this.resourceSecondsMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPreemptedResourceSecondsMap(
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
|
||||
if (preemptedResourceSecondsMap == null) {
|
||||
return;
|
||||
}
|
||||
if (!preemptedResourceSecondsMap
|
||||
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
this.setPreemptedMemorySeconds(0L);
|
||||
}
|
||||
if (!preemptedResourceSecondsMap
|
||||
.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
this.setPreemptedVcoreSeconds(0L);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Map<String, Long> getPreemptedResourceSecondsMap() {
|
||||
if (this.preemptedResourceSecondsMap != null) {
|
||||
return this.preemptedResourceSecondsMap;
|
||||
}
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
this.preemptedResourceSecondsMap = ProtoUtils
|
||||
.convertStringLongMapProtoListToMap(
|
||||
p.getApplicationPreemptedResourceUsageMapList());
|
||||
if (!this.preemptedResourceSecondsMap
|
||||
.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
this.setPreemptedMemorySeconds(p.getPreemptedMemorySeconds());
|
||||
}
|
||||
if (!this.preemptedResourceSecondsMap
|
||||
.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
this.setPreemptedVcoreSeconds(p.getPreemptedVcoreSeconds());
|
||||
}
|
||||
this.setPreemptedMemorySeconds(p.getPreemptedMemorySeconds());
|
||||
this.setPreemptedVcoreSeconds(p.getPreemptedVcoreSeconds());
|
||||
return this.preemptedResourceSecondsMap;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,10 @@
|
|||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
|
|||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
|
@ -462,6 +467,35 @@ public class ProtoUtils {
|
|||
public static ResourceTypes convertFromProtoFormat(ResourceTypesProto e) {
|
||||
return ResourceTypes.valueOf(e.name());
|
||||
}
|
||||
|
||||
public static Map<String, Long> convertStringLongMapProtoListToMap(
|
||||
List<YarnProtos.StringLongMapProto> pList) {
|
||||
Resource tmp = Resource.newInstance(0, 0);
|
||||
Map<String, Long> ret = new HashMap<>();
|
||||
for (Map.Entry<String, ResourceInformation> entry : tmp.getResources()
|
||||
.entrySet()) {
|
||||
ret.put(entry.getKey(), 0L);
|
||||
}
|
||||
if (pList != null) {
|
||||
for (YarnProtos.StringLongMapProto p : pList) {
|
||||
ret.put(p.getKey(), p.getValue());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static List<YarnProtos.StringLongMapProto> convertMapToStringLongMapProtoList(
|
||||
Map<String, Long> map) {
|
||||
List<YarnProtos.StringLongMapProto> ret = new ArrayList<>();
|
||||
for (Map.Entry<String, Long> entry : map.entrySet()) {
|
||||
YarnProtos.StringLongMapProto.Builder tmp =
|
||||
YarnProtos.StringLongMapProto.newBuilder();
|
||||
tmp.setKey(entry.getKey());
|
||||
tmp.setValue(entry.getValue());
|
||||
ret.add(tmp.build());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -20,9 +20,15 @@ package org.apache.hadoop.yarn.util;
|
|||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Splitter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
/**
|
||||
* Common string manipulation helpers
|
||||
|
@ -174,4 +180,34 @@ public final class StringHelper {
|
|||
}
|
||||
sb.append(part);
|
||||
}
|
||||
|
||||
public static String getResourceSecondsString(Map<String, Long> targetMap) {
|
||||
List<String> strings = new ArrayList<>(targetMap.size());
|
||||
//completed app report in the timeline server doesn't have usage report
|
||||
Long memorySeconds = 0L;
|
||||
Long vcoreSeconds = 0L;
|
||||
if (targetMap.containsKey(ResourceInformation.MEMORY_MB.getName())) {
|
||||
memorySeconds = targetMap.get(ResourceInformation.MEMORY_MB.getName());
|
||||
}
|
||||
if (targetMap.containsKey(ResourceInformation.VCORES.getName())) {
|
||||
vcoreSeconds = targetMap.get(ResourceInformation.VCORES.getName());
|
||||
}
|
||||
strings.add(memorySeconds + " MB-seconds");
|
||||
strings.add(vcoreSeconds + " vcore-seconds");
|
||||
Map<String, ResourceInformation> tmp = ResourceUtils.getResourceTypes();
|
||||
if (targetMap.size() > 2) {
|
||||
for (Map.Entry<String, Long> entry : targetMap.entrySet()) {
|
||||
if (!entry.getKey().equals(ResourceInformation.MEMORY_MB.getName())
|
||||
&& !entry.getKey().equals(ResourceInformation.VCORES.getName())) {
|
||||
String units = "";
|
||||
if (tmp.containsKey(entry.getKey())) {
|
||||
units = tmp.get(entry.getKey()).getUnits();
|
||||
}
|
||||
strings.add(entry.getValue() + " " + entry.getKey() + "-" + units
|
||||
+ "seconds");
|
||||
}
|
||||
}
|
||||
}
|
||||
return Joiner.on(", ").join(strings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,9 @@ public class BasePBImplRecordsTest {
|
|||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected static HashMap<Type, Object> typeValueCache =
|
||||
new HashMap<Type, Object>();
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected static HashMap<Type, List<String>> excludedPropertiesMap =
|
||||
new HashMap<>();
|
||||
private static Random rand = new Random();
|
||||
private static byte [] bytes = new byte[] {'1', '2', '3', '4'};
|
||||
|
||||
|
@ -167,6 +170,10 @@ public class BasePBImplRecordsTest {
|
|||
private <R> Map<String, GetSetPair> getGetSetPairs(Class<R> recordClass)
|
||||
throws Exception {
|
||||
Map<String, GetSetPair> ret = new HashMap<String, GetSetPair>();
|
||||
List<String> excluded = null;
|
||||
if (excludedPropertiesMap.containsKey(recordClass.getClass())) {
|
||||
excluded = excludedPropertiesMap.get(recordClass.getClass());
|
||||
}
|
||||
Method [] methods = recordClass.getDeclaredMethods();
|
||||
// get all get methods
|
||||
for (int i = 0; i < methods.length; i++) {
|
||||
|
@ -224,6 +231,11 @@ public class BasePBImplRecordsTest {
|
|||
(gsp.setMethod == null)) {
|
||||
LOG.info(String.format("Exclude potential property: %s\n", gsp.propertyName));
|
||||
itr.remove();
|
||||
} else if ((excluded != null && excluded.contains(gsp.propertyName))) {
|
||||
LOG.info(String.format(
|
||||
"Excluding potential property(present in exclusion list): %s\n",
|
||||
gsp.propertyName));
|
||||
itr.remove();
|
||||
} else {
|
||||
LOG.info(String.format("New property: %s type: %s", gsp.toString(), gsp.type));
|
||||
gsp.testValue = genTypeValue(gsp.type);
|
||||
|
|
|
@ -336,6 +336,8 @@ import org.junit.Ignore;
|
|||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Test class for YARN API protocol records.
|
||||
|
@ -738,6 +740,8 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
|
|||
|
||||
@Test
|
||||
public void testApplicationResourceUsageReportPBImpl() throws Exception {
|
||||
excludedPropertiesMap.put(ApplicationResourceUsageReportPBImpl.class.getClass(),
|
||||
Arrays.asList("PreemptedResourceSecondsMap", "ResourceSecondsMap"));
|
||||
validatePBImplRecord(ApplicationResourceUsageReportPBImpl.class,
|
||||
ApplicationResourceUsageReportProto.class);
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
|
@ -342,9 +343,20 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS);
|
||||
long preemptedVcoreSeconds = parseLong(entityInfo,
|
||||
ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS);
|
||||
appResources = ApplicationResourceUsageReport.newInstance(0, 0, null,
|
||||
null, null, memorySeconds, vcoreSeconds, 0, 0,
|
||||
preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds);
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
|
||||
preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(),
|
||||
preemptedMemorySeconds);
|
||||
preemptedResoureSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), preemptedVcoreSeconds);
|
||||
|
||||
appResources = ApplicationResourceUsageReport
|
||||
.newInstance(0, 0, null, null, null, resourceSecondsMap, 0, 0,
|
||||
preemptedResoureSecondsMap);
|
||||
}
|
||||
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {
|
||||
|
|
|
@ -65,8 +65,6 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
|||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
|
@ -436,12 +434,12 @@ public class BuilderUtils {
|
|||
queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete,
|
||||
maxAppAttempts, resource, null);
|
||||
}
|
||||
|
||||
|
||||
public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
|
||||
int numUsedContainers, int numReservedContainers, Resource usedResources,
|
||||
Resource reservedResources, Resource neededResources, long memorySeconds,
|
||||
long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
Resource reservedResources, Resource neededResources,
|
||||
Map<String, Long> resourceSecondsMap,
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
ApplicationResourceUsageReport report =
|
||||
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
|
||||
report.setNumUsedContainers(numUsedContainers);
|
||||
|
@ -449,10 +447,8 @@ public class BuilderUtils {
|
|||
report.setUsedResources(usedResources);
|
||||
report.setReservedResources(reservedResources);
|
||||
report.setNeededResources(neededResources);
|
||||
report.setMemorySeconds(memorySeconds);
|
||||
report.setVcoreSeconds(vcoreSeconds);
|
||||
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
report.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
|
||||
report.setResourceSecondsMap(resourceSecondsMap);
|
||||
report.setPreemptedResourceSecondsMap(preemptedResourceSecondsMap);
|
||||
return report;
|
||||
}
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.util.Times;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
|
||||
/**
|
||||
* This class manages the list of applications for the resource manager.
|
||||
|
@ -189,7 +190,12 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
.add("preemptedAMContainers", metrics.getNumAMContainersPreempted())
|
||||
.add("preemptedNonAMContainers", metrics.getNumNonAMContainersPreempted())
|
||||
.add("preemptedResources", metrics.getResourcePreempted())
|
||||
.add("applicationType", app.getApplicationType());
|
||||
.add("applicationType", app.getApplicationType())
|
||||
.add("resourceSeconds", StringHelper
|
||||
.getResourceSecondsString(metrics.getResourceSecondsMap()))
|
||||
.add("preemptedResourceSeconds", StringHelper
|
||||
.getResourceSecondsString(
|
||||
metrics.getPreemptedResourceSecondsMap()));
|
||||
return summary;
|
||||
}
|
||||
|
||||
|
|
|
@ -486,7 +486,7 @@ public class RMServerUtils {
|
|||
DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
|
||||
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
|
||||
Resources.createResource(-1, -1), Resources.createResource(-1, -1),
|
||||
Resources.createResource(-1, -1), 0, 0, 0, 0);
|
||||
Resources.createResource(-1, -1), new HashMap<String, Long>(), new HashMap<String, Long>());
|
||||
|
||||
|
||||
/**
|
||||
|
@ -630,4 +630,12 @@ public class RMServerUtils {
|
|||
return labelsToNodes.get(label);
|
||||
}
|
||||
}
|
||||
|
||||
public static Long getOrDefault(Map<String, Long> map, String key,
|
||||
Long defaultValue) {
|
||||
if (map.containsKey(key)) {
|
||||
return map.get(key);
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -853,11 +853,8 @@ public abstract class RMStateStore extends AbstractService {
|
|||
appAttempt.getAppAttemptId(),
|
||||
appAttempt.getMasterContainer(),
|
||||
credentials, appAttempt.getStartTime(),
|
||||
resUsage.getMemorySeconds(),
|
||||
resUsage.getVcoreSeconds(),
|
||||
attempMetrics.getPreemptedMemory(),
|
||||
attempMetrics.getPreemptedVcore()
|
||||
);
|
||||
resUsage.getResourceUsageSecondsMap(),
|
||||
attempMetrics.getPreemptedResourceSecondsMap());
|
||||
|
||||
getRMStateStoreEventHandler().handle(
|
||||
new RMStateStoreAppAttemptEvent(attemptState));
|
||||
|
|
|
@ -25,23 +25,28 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/*
|
||||
* Contains the state data that needs to be persisted for an ApplicationAttempt
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class ApplicationAttemptStateData {
|
||||
|
||||
public static ApplicationAttemptStateData newInstance(
|
||||
ApplicationAttemptId attemptId, Container container,
|
||||
Credentials attemptTokens, long startTime, RMAppAttemptState finalState,
|
||||
String finalTrackingUrl, String diagnostics,
|
||||
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
|
||||
long finishTime, long memorySeconds, long vcoreSeconds,
|
||||
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
|
||||
long finishTime, Map<String, Long> resourceSecondsMap,
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
ApplicationAttemptStateData attemptStateData =
|
||||
Records.newRecord(ApplicationAttemptStateData.class);
|
||||
attemptStateData.setAttemptId(attemptId);
|
||||
|
@ -54,23 +59,33 @@ public abstract class ApplicationAttemptStateData {
|
|||
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
|
||||
attemptStateData.setAMContainerExitStatus(exitStatus);
|
||||
attemptStateData.setFinishTime(finishTime);
|
||||
attemptStateData.setMemorySeconds(memorySeconds);
|
||||
attemptStateData.setVcoreSeconds(vcoreSeconds);
|
||||
attemptStateData.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
attemptStateData.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
|
||||
attemptStateData.setMemorySeconds(RMServerUtils
|
||||
.getOrDefault(resourceSecondsMap,
|
||||
ResourceInformation.MEMORY_MB.getName(), 0L));
|
||||
attemptStateData.setVcoreSeconds(RMServerUtils
|
||||
.getOrDefault(resourceSecondsMap, ResourceInformation.VCORES.getName(),
|
||||
0L));
|
||||
attemptStateData.setPreemptedMemorySeconds(RMServerUtils
|
||||
.getOrDefault(preemptedResourceSecondsMap,
|
||||
ResourceInformation.MEMORY_MB.getName(), 0L));
|
||||
attemptStateData.setPreemptedVcoreSeconds(RMServerUtils
|
||||
.getOrDefault(preemptedResourceSecondsMap,
|
||||
ResourceInformation.VCORES.getName(), 0L));
|
||||
attemptStateData.setResourceSecondsMap(resourceSecondsMap);
|
||||
attemptStateData
|
||||
.setPreemptedResourceSecondsMap(preemptedResourceSecondsMap);
|
||||
return attemptStateData;
|
||||
}
|
||||
|
||||
public static ApplicationAttemptStateData newInstance(
|
||||
ApplicationAttemptId attemptId, Container masterContainer,
|
||||
Credentials attemptTokens, long startTime, long memorySeconds,
|
||||
long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
return newInstance(attemptId, masterContainer, attemptTokens,
|
||||
startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
|
||||
memorySeconds, vcoreSeconds,
|
||||
preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
}
|
||||
Credentials attemptTokens, long startTime,
|
||||
Map<String, Long> resourceSeondsMap,
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
return newInstance(attemptId, masterContainer, attemptTokens, startTime,
|
||||
null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
|
||||
resourceSeondsMap, preemptedResourceSecondsMap);
|
||||
}
|
||||
|
||||
|
||||
public abstract ApplicationAttemptStateDataProto getProto();
|
||||
|
@ -215,4 +230,50 @@ public abstract class ApplicationAttemptStateData {
|
|||
@Public
|
||||
@Unstable
|
||||
public abstract void setPreemptedVcoreSeconds(long vcoreSeconds);
|
||||
|
||||
/**
|
||||
* Get the aggregated number of resources preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
*
|
||||
* @return map containing the resource name and aggregated preempted
|
||||
* resource-seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Map<String, Long> getResourceSecondsMap();
|
||||
|
||||
/**
|
||||
* Set the aggregated number of resources that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
*
|
||||
* @param resourceSecondsMap map containing the resource name and aggregated
|
||||
* resource-seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setResourceSecondsMap(
|
||||
Map<String, Long> resourceSecondsMap);
|
||||
|
||||
/**
|
||||
* Get the aggregated number of resources preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
*
|
||||
* @return map containing the resource name and aggregated preempted
|
||||
* resource-seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Map<String, Long> getPreemptedResourceSecondsMap();
|
||||
|
||||
/**
|
||||
* Set the aggregated number of resources preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
*
|
||||
* @param preemptedResourceSecondsMap map containing the resource name and
|
||||
* aggregated preempted resource-seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setPreemptedResourceSecondsMap(
|
||||
Map<String, Long> preemptedResourceSecondsMap);
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -55,6 +56,9 @@ public class ApplicationAttemptStateDataPBImpl extends
|
|||
private Container masterContainer = null;
|
||||
private ByteBuffer appAttemptTokens = null;
|
||||
|
||||
private Map<String, Long> resourceSecondsMap;
|
||||
private Map<String, Long> preemptedResourceSecondsMap;
|
||||
|
||||
public ApplicationAttemptStateDataPBImpl() {
|
||||
builder = ApplicationAttemptStateDataProto.newBuilder();
|
||||
}
|
||||
|
@ -404,4 +408,50 @@ public class ApplicationAttemptStateDataPBImpl extends
|
|||
IOUtils.closeStream(dibb);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getResourceSecondsMap() {
|
||||
if (this.resourceSecondsMap != null) {
|
||||
return this.resourceSecondsMap;
|
||||
}
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
this.resourceSecondsMap = ProtoUtils.convertStringLongMapProtoListToMap(
|
||||
p.getApplicationResourceUsageMapList());
|
||||
return this.resourceSecondsMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResourceSecondsMap(Map<String, Long> resourceSecondsMap) {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationResourceUsageMap();
|
||||
this.resourceSecondsMap = resourceSecondsMap;
|
||||
if (resourceSecondsMap != null) {
|
||||
builder.addAllApplicationResourceUsageMap(
|
||||
ProtoUtils.convertMapToStringLongMapProtoList(resourceSecondsMap));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getPreemptedResourceSecondsMap() {
|
||||
if (this.preemptedResourceSecondsMap != null) {
|
||||
return this.preemptedResourceSecondsMap;
|
||||
}
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
this.preemptedResourceSecondsMap = ProtoUtils
|
||||
.convertStringLongMapProtoListToMap(
|
||||
p.getApplicationResourceUsageMapList());
|
||||
return this.preemptedResourceSecondsMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPreemptedResourceSecondsMap(
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
maybeInitBuilder();
|
||||
builder.clearPreemptedResourceUsageMap();
|
||||
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
|
||||
if (preemptedResourceSecondsMap != null) {
|
||||
builder.addAllPreemptedResourceUsageMap(ProtoUtils
|
||||
.convertMapToStringLongMapProtoList(preemptedResourceSecondsMap));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -754,14 +754,10 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
|
||||
RMAppMetrics rmAppMetrics = getRMAppMetrics();
|
||||
appUsageReport.setMemorySeconds(rmAppMetrics.getMemorySeconds());
|
||||
appUsageReport.setVcoreSeconds(rmAppMetrics.getVcoreSeconds());
|
||||
appUsageReport.
|
||||
setPreemptedMemorySeconds(rmAppMetrics.
|
||||
getPreemptedMemorySeconds());
|
||||
appUsageReport.
|
||||
setPreemptedVcoreSeconds(rmAppMetrics.
|
||||
getPreemptedVcoreSeconds());
|
||||
appUsageReport
|
||||
.setResourceSecondsMap(rmAppMetrics.getResourceSecondsMap());
|
||||
appUsageReport.setPreemptedResourceSecondsMap(
|
||||
rmAppMetrics.getPreemptedResourceSecondsMap());
|
||||
}
|
||||
|
||||
if (currentApplicationAttemptId == null) {
|
||||
|
@ -1656,10 +1652,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
Resource resourcePreempted = Resource.newInstance(0, 0);
|
||||
int numAMContainerPreempted = 0;
|
||||
int numNonAMContainerPreempted = 0;
|
||||
long memorySeconds = 0;
|
||||
long vcoreSeconds = 0;
|
||||
long preemptedMemorySeconds = 0;
|
||||
long preemptedVcoreSeconds = 0;
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
Map<String, Long> preemptedSecondsMap = new HashMap<>();
|
||||
|
||||
this.readLock.lock();
|
||||
try {
|
||||
for (RMAppAttempt attempt : attempts.values()) {
|
||||
|
@ -1675,20 +1670,28 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// for both running and finished containers.
|
||||
AggregateAppResourceUsage resUsage =
|
||||
attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
|
||||
memorySeconds += resUsage.getMemorySeconds();
|
||||
vcoreSeconds += resUsage.getVcoreSeconds();
|
||||
preemptedMemorySeconds += attemptMetrics.getPreemptedMemory();
|
||||
preemptedVcoreSeconds += attemptMetrics.getPreemptedVcore();
|
||||
for (Map.Entry<String, Long> entry : resUsage
|
||||
.getResourceUsageSecondsMap().entrySet()) {
|
||||
long value = RMServerUtils
|
||||
.getOrDefault(resourceSecondsMap, entry.getKey(), 0L);
|
||||
value += entry.getValue();
|
||||
resourceSecondsMap.put(entry.getKey(), value);
|
||||
}
|
||||
for (Map.Entry<String, Long> entry : attemptMetrics
|
||||
.getPreemptedResourceSecondsMap().entrySet()) {
|
||||
long value = RMServerUtils
|
||||
.getOrDefault(preemptedSecondsMap, entry.getKey(), 0L);
|
||||
value += entry.getValue();
|
||||
preemptedSecondsMap.put(entry.getKey(), value);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
||||
return new RMAppMetrics(resourcePreempted,
|
||||
numNonAMContainerPreempted, numAMContainerPreempted,
|
||||
memorySeconds, vcoreSeconds,
|
||||
preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted,
|
||||
numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap);
|
||||
}
|
||||
|
||||
@Private
|
||||
|
|
|
@ -19,27 +19,27 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class RMAppMetrics {
|
||||
final Resource resourcePreempted;
|
||||
final int numNonAMContainersPreempted;
|
||||
final int numAMContainersPreempted;
|
||||
final long memorySeconds;
|
||||
final long vcoreSeconds;
|
||||
private final long preemptedMemorySeconds;
|
||||
private final long preemptedVcoreSeconds;
|
||||
private final Map<String, Long> resourceSecondsMap;
|
||||
private final Map<String, Long> preemptedResourceSecondsMap;
|
||||
|
||||
public RMAppMetrics(Resource resourcePreempted,
|
||||
int numNonAMContainersPreempted, int numAMContainersPreempted,
|
||||
long memorySeconds, long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
Map<String, Long> resourceSecondsMap,
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
this.resourcePreempted = resourcePreempted;
|
||||
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
|
||||
this.numAMContainersPreempted = numAMContainersPreempted;
|
||||
this.memorySeconds = memorySeconds;
|
||||
this.vcoreSeconds = vcoreSeconds;
|
||||
this.preemptedMemorySeconds = preemptedMemorySeconds;
|
||||
this.preemptedVcoreSeconds = preemptedVcoreSeconds;
|
||||
this.resourceSecondsMap = resourceSecondsMap;
|
||||
this.preemptedResourceSecondsMap = preemptedResourceSecondsMap;
|
||||
}
|
||||
|
||||
public Resource getResourcePreempted() {
|
||||
|
@ -55,19 +55,32 @@ public class RMAppMetrics {
|
|||
}
|
||||
|
||||
public long getMemorySeconds() {
|
||||
return memorySeconds;
|
||||
return RMServerUtils.getOrDefault(resourceSecondsMap,
|
||||
ResourceInformation.MEMORY_MB.getName(), 0L);
|
||||
}
|
||||
|
||||
public long getVcoreSeconds() {
|
||||
return vcoreSeconds;
|
||||
return RMServerUtils
|
||||
.getOrDefault(resourceSecondsMap, ResourceInformation.VCORES.getName(),
|
||||
0L);
|
||||
}
|
||||
|
||||
public long getPreemptedMemorySeconds() {
|
||||
return preemptedMemorySeconds;
|
||||
return RMServerUtils.getOrDefault(preemptedResourceSecondsMap,
|
||||
ResourceInformation.MEMORY_MB.getName(), 0L);
|
||||
}
|
||||
|
||||
public long getPreemptedVcoreSeconds() {
|
||||
return preemptedVcoreSeconds;
|
||||
return RMServerUtils.getOrDefault(preemptedResourceSecondsMap,
|
||||
ResourceInformation.VCORES.getName(), 0L);
|
||||
}
|
||||
|
||||
public Map<String, Long> getResourceSecondsMap() {
|
||||
return resourceSecondsMap;
|
||||
}
|
||||
|
||||
public Map<String, Long> getPreemptedResourceSecondsMap() {
|
||||
return preemptedResourceSecondsMap;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,42 +19,38 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Private
|
||||
public class AggregateAppResourceUsage {
|
||||
long memorySeconds;
|
||||
long vcoreSeconds;
|
||||
private Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
|
||||
public AggregateAppResourceUsage(long memorySeconds, long vcoreSeconds) {
|
||||
this.memorySeconds = memorySeconds;
|
||||
this.vcoreSeconds = vcoreSeconds;
|
||||
public AggregateAppResourceUsage(Map<String, Long> resourceSecondsMap) {
|
||||
this.resourceSecondsMap.putAll(resourceSecondsMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the memorySeconds
|
||||
*/
|
||||
public long getMemorySeconds() {
|
||||
return memorySeconds;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param memorySeconds the memorySeconds to set
|
||||
*/
|
||||
public void setMemorySeconds(long memorySeconds) {
|
||||
this.memorySeconds = memorySeconds;
|
||||
return RMServerUtils.getOrDefault(resourceSecondsMap,
|
||||
ResourceInformation.MEMORY_MB.getName(), 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the vcoreSeconds
|
||||
*/
|
||||
public long getVcoreSeconds() {
|
||||
return vcoreSeconds;
|
||||
return RMServerUtils
|
||||
.getOrDefault(resourceSecondsMap, ResourceInformation.VCORES.getName(),
|
||||
0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param vcoreSeconds the vcoreSeconds to set
|
||||
*/
|
||||
public void setVcoreSeconds(long vcoreSeconds) {
|
||||
this.vcoreSeconds = vcoreSeconds;
|
||||
public Map<String, Long> getResourceUsageSecondsMap() {
|
||||
return resourceSecondsMap;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -939,12 +939,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
AggregateAppResourceUsage resUsage =
|
||||
this.attemptMetrics.getAggregateAppResourceUsage();
|
||||
report.setMemorySeconds(resUsage.getMemorySeconds());
|
||||
report.setVcoreSeconds(resUsage.getVcoreSeconds());
|
||||
report.setPreemptedMemorySeconds(
|
||||
this.attemptMetrics.getPreemptedMemory());
|
||||
report.setPreemptedVcoreSeconds(
|
||||
this.attemptMetrics.getPreemptedVcore());
|
||||
report.setResourceSecondsMap(resUsage.getResourceUsageSecondsMap());
|
||||
report.setPreemptedResourceSecondsMap(
|
||||
this.attemptMetrics.getPreemptedResourceSecondsMap());
|
||||
return report;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
|
@ -981,11 +978,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.finalStatus = attemptState.getFinalApplicationStatus();
|
||||
this.startTime = attemptState.getStartTime();
|
||||
this.finishTime = attemptState.getFinishTime();
|
||||
this.attemptMetrics.updateAggregateAppResourceUsage(
|
||||
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
|
||||
this.attemptMetrics
|
||||
.updateAggregateAppResourceUsage(attemptState.getResourceSecondsMap());
|
||||
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
|
||||
attemptState.getPreemptedMemorySeconds(),
|
||||
attemptState.getPreemptedVcoreSeconds());
|
||||
attemptState.getPreemptedResourceSecondsMap());
|
||||
}
|
||||
|
||||
public void transferStateFromAttempt(RMAppAttempt attempt) {
|
||||
|
@ -1361,16 +1357,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
RMStateStore rmStore = rmContext.getStateStore();
|
||||
setFinishTime(System.currentTimeMillis());
|
||||
|
||||
ApplicationAttemptStateData attemptState =
|
||||
ApplicationAttemptStateData.newInstance(
|
||||
applicationAttemptId, getMasterContainer(),
|
||||
rmStore.getCredentialsFromAppAttempt(this),
|
||||
startTime, stateToBeStored, finalTrackingUrl, diags.toString(),
|
||||
finalStatus, exitStatus,
|
||||
getFinishTime(), resUsage.getMemorySeconds(),
|
||||
resUsage.getVcoreSeconds(),
|
||||
this.attemptMetrics.getPreemptedMemory(),
|
||||
this.attemptMetrics.getPreemptedVcore());
|
||||
ApplicationAttemptStateData attemptState = ApplicationAttemptStateData
|
||||
.newInstance(applicationAttemptId, getMasterContainer(),
|
||||
rmStore.getCredentialsFromAppAttempt(this), startTime,
|
||||
stateToBeStored, finalTrackingUrl, diags.toString(), finalStatus, exitStatus,
|
||||
getFinishTime(), resUsage.getResourceUsageSecondsMap(),
|
||||
this.attemptMetrics.getPreemptedResourceSecondsMap());
|
||||
LOG.info("Updating application attempt " + applicationAttemptId
|
||||
+ " with final state: " + targetedFinalState + ", and exit status: "
|
||||
+ exitStatus);
|
||||
|
@ -1832,9 +1824,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.startTime, appAttempt.recoveredFinalState,
|
||||
newTrackingUrl, appAttempt.getDiagnostics(), null,
|
||||
ContainerExitStatus.INVALID, appAttempt.getFinishTime(),
|
||||
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds(),
|
||||
appAttempt.attemptMetrics.getPreemptedMemory(),
|
||||
appAttempt.attemptMetrics.getPreemptedVcore());
|
||||
appAttempt.attemptMetrics.getAggregateAppResourceUsage()
|
||||
.getResourceUsageSecondsMap(),
|
||||
appAttempt.attemptMetrics.getPreemptedResourceSecondsMap());
|
||||
appAttempt.rmContext.getStateStore()
|
||||
.updateApplicationAttemptState(attemptState);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -25,11 +27,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -49,10 +53,8 @@ public class RMAppAttemptMetrics {
|
|||
|
||||
private ReadLock readLock;
|
||||
private WriteLock writeLock;
|
||||
private AtomicLong finishedMemorySeconds = new AtomicLong(0);
|
||||
private AtomicLong finishedVcoreSeconds = new AtomicLong(0);
|
||||
private AtomicLong preemptedMemorySeconds = new AtomicLong(0);
|
||||
private AtomicLong preemptedVcoreSeconds = new AtomicLong(0);
|
||||
private Map<String, AtomicLong> resourceUsageMap = new HashMap<>();
|
||||
private Map<String, AtomicLong> preemptedResourceMap = new HashMap<>();
|
||||
private RMContext rmContext;
|
||||
|
||||
private int[][] localityStatistics =
|
||||
|
@ -102,11 +104,16 @@ public class RMAppAttemptMetrics {
|
|||
}
|
||||
|
||||
public long getPreemptedMemory() {
|
||||
return preemptedMemorySeconds.get();
|
||||
return preemptedResourceMap.get(ResourceInformation.MEMORY_MB.getName())
|
||||
.get();
|
||||
}
|
||||
|
||||
public long getPreemptedVcore() {
|
||||
return preemptedVcoreSeconds.get();
|
||||
return preemptedResourceMap.get(ResourceInformation.VCORES.getName()).get();
|
||||
}
|
||||
|
||||
public Map<String, Long> getPreemptedResourceSecondsMap() {
|
||||
return convertAtomicLongMaptoLongMap(preemptedResourceMap);
|
||||
}
|
||||
|
||||
public int getNumNonAMContainersPreempted() {
|
||||
|
@ -122,35 +129,90 @@ public class RMAppAttemptMetrics {
|
|||
}
|
||||
|
||||
public AggregateAppResourceUsage getAggregateAppResourceUsage() {
|
||||
long memorySeconds = finishedMemorySeconds.get();
|
||||
long vcoreSeconds = finishedVcoreSeconds.get();
|
||||
Map<String, Long> resourcesUsed =
|
||||
convertAtomicLongMaptoLongMap(resourceUsageMap);
|
||||
|
||||
// Only add in the running containers if this is the active attempt.
|
||||
RMApp rmApp = rmContext.getRMApps().get(attemptId.getApplicationId());
|
||||
if (null != rmApp) {
|
||||
RMAppAttempt currentAttempt = rmApp.getCurrentAppAttempt();
|
||||
if (rmApp != null) {
|
||||
RMAppAttempt currentAttempt = rmContext.getRMApps().get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
||||
if (currentAttempt.getAppAttemptId().equals(attemptId)) {
|
||||
ApplicationResourceUsageReport appResUsageReport = rmContext
|
||||
.getScheduler().getAppResourceUsageReport(attemptId);
|
||||
ApplicationResourceUsageReport appResUsageReport =
|
||||
rmContext.getScheduler().getAppResourceUsageReport(attemptId);
|
||||
if (appResUsageReport != null) {
|
||||
memorySeconds += appResUsageReport.getMemorySeconds();
|
||||
vcoreSeconds += appResUsageReport.getVcoreSeconds();
|
||||
Map<String, Long> tmp = appResUsageReport.getResourceSecondsMap();
|
||||
for (Map.Entry<String, Long> entry : tmp.entrySet()) {
|
||||
if (resourcesUsed.containsKey(entry.getKey())) {
|
||||
Long value = resourcesUsed.get(entry.getKey());
|
||||
value += entry.getValue();
|
||||
resourcesUsed.put(entry.getKey(), value);
|
||||
} else{
|
||||
resourcesUsed.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return new AggregateAppResourceUsage(memorySeconds, vcoreSeconds);
|
||||
return new AggregateAppResourceUsage(resourcesUsed);
|
||||
}
|
||||
|
||||
public void updateAggregateAppResourceUsage(long finishedMemorySeconds,
|
||||
long finishedVcoreSeconds) {
|
||||
this.finishedMemorySeconds.addAndGet(finishedMemorySeconds);
|
||||
this.finishedVcoreSeconds.addAndGet(finishedVcoreSeconds);
|
||||
public void updateAggregateAppResourceUsage(Resource allocated,
|
||||
long deltaUsedMillis) {
|
||||
updateUsageMap(allocated, deltaUsedMillis, resourceUsageMap);
|
||||
}
|
||||
|
||||
public void updateAggregatePreemptedAppResourceUsage(Resource allocated,
|
||||
long deltaUsedMillis) {
|
||||
updateUsageMap(allocated, deltaUsedMillis, preemptedResourceMap);
|
||||
}
|
||||
|
||||
public void updateAggregateAppResourceUsage(
|
||||
Map<String, Long> resourceSecondsMap) {
|
||||
updateUsageMap(resourceSecondsMap, resourceUsageMap);
|
||||
}
|
||||
|
||||
public void updateAggregatePreemptedAppResourceUsage(
|
||||
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
|
||||
this.preemptedMemorySeconds.addAndGet(preemptedMemorySeconds);
|
||||
this.preemptedVcoreSeconds.addAndGet(preemptedVcoreSeconds);
|
||||
Map<String, Long> preemptedResourceSecondsMap) {
|
||||
updateUsageMap(preemptedResourceSecondsMap, preemptedResourceMap);
|
||||
}
|
||||
|
||||
private void updateUsageMap(Resource allocated, long deltaUsedMillis,
|
||||
Map<String, AtomicLong> targetMap) {
|
||||
for (Map.Entry<String, ResourceInformation> entry : allocated.getResources()
|
||||
.entrySet()) {
|
||||
AtomicLong resourceUsed;
|
||||
if (!targetMap.containsKey(entry.getKey())) {
|
||||
resourceUsed = new AtomicLong(0);
|
||||
targetMap.put(entry.getKey(), resourceUsed);
|
||||
|
||||
}
|
||||
resourceUsed = targetMap.get(entry.getKey());
|
||||
resourceUsed.addAndGet((entry.getValue().getValue() * deltaUsedMillis)
|
||||
/ DateUtils.MILLIS_PER_SECOND);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateUsageMap(Map<String, Long> sourceMap,
|
||||
Map<String, AtomicLong> targetMap) {
|
||||
for (Map.Entry<String, Long> entry : sourceMap.entrySet()) {
|
||||
AtomicLong resourceUsed;
|
||||
if (!targetMap.containsKey(entry.getKey())) {
|
||||
resourceUsed = new AtomicLong(0);
|
||||
targetMap.put(entry.getKey(), resourceUsed);
|
||||
|
||||
}
|
||||
resourceUsed = targetMap.get(entry.getKey());
|
||||
resourceUsed.set(entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Long> convertAtomicLongMaptoLongMap(
|
||||
Map<String, AtomicLong> source) {
|
||||
Map<String, Long> ret = new HashMap<>();
|
||||
for (Map.Entry<String, AtomicLong> entry : source.entrySet()) {
|
||||
ret.put(entry.getKey(), entry.getValue().get());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public void incNumAllocatedContainers(NodeType containerType,
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -716,20 +715,15 @@ public class RMContainerImpl implements RMContainer {
|
|||
|
||||
if (rmAttempt != null) {
|
||||
long usedMillis = container.finishTime - container.creationTime;
|
||||
long memorySeconds = resource.getMemorySize()
|
||||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
long vcoreSeconds = resource.getVirtualCores()
|
||||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
rmAttempt.getRMAppAttemptMetrics()
|
||||
.updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds);
|
||||
.updateAggregateAppResourceUsage(resource, usedMillis);
|
||||
// If this is a preempted container, update preemption metrics
|
||||
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
|
||||
.getExitStatus()) {
|
||||
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
|
||||
container);
|
||||
.getExitStatus()) {
|
||||
rmAttempt.getRMAppAttemptMetrics()
|
||||
.updateAggregatePreemptedAppResourceUsage(memorySeconds,
|
||||
vcoreSeconds);
|
||||
.updatePreemptionInfo(resource, container);
|
||||
rmAttempt.getRMAppAttemptMetrics()
|
||||
.updateAggregatePreemptedAppResourceUsage(resource, usedMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,11 +55,13 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
|
||||
|
@ -107,9 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
|
||||
private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
|
||||
protected long lastMemoryAggregateAllocationUpdateTime = 0;
|
||||
private long lastMemorySeconds = 0;
|
||||
private long lastVcoreSeconds = 0;
|
||||
|
||||
private Map<String, Long> lastResourceSecondsMap = new HashMap<>();
|
||||
protected final AppSchedulingInfo appSchedulingInfo;
|
||||
protected ApplicationAttemptId attemptId;
|
||||
protected Map<ContainerId, RMContainer> liveContainers =
|
||||
|
@ -1002,22 +1002,24 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
// recently.
|
||||
if ((currentTimeMillis - lastMemoryAggregateAllocationUpdateTime)
|
||||
> MEM_AGGREGATE_ALLOCATION_CACHE_MSECS) {
|
||||
long memorySeconds = 0;
|
||||
long vcoreSeconds = 0;
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
for (RMContainer rmContainer : this.liveContainers.values()) {
|
||||
long usedMillis = currentTimeMillis - rmContainer.getCreationTime();
|
||||
Resource resource = rmContainer.getContainer().getResource();
|
||||
memorySeconds += resource.getMemorySize() * usedMillis /
|
||||
DateUtils.MILLIS_PER_SECOND;
|
||||
vcoreSeconds += resource.getVirtualCores() * usedMillis
|
||||
/ DateUtils.MILLIS_PER_SECOND;
|
||||
for (Map.Entry<String, ResourceInformation> entry : resource
|
||||
.getResources().entrySet()) {
|
||||
long value = RMServerUtils
|
||||
.getOrDefault(resourceSecondsMap, entry.getKey(), 0L);
|
||||
value += entry.getValue().getValue() * usedMillis
|
||||
/ DateUtils.MILLIS_PER_SECOND;
|
||||
resourceSecondsMap.put(entry.getKey(), value);
|
||||
}
|
||||
}
|
||||
|
||||
lastMemoryAggregateAllocationUpdateTime = currentTimeMillis;
|
||||
lastMemorySeconds = memorySeconds;
|
||||
lastVcoreSeconds = vcoreSeconds;
|
||||
lastResourceSecondsMap = resourceSecondsMap;
|
||||
}
|
||||
return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds);
|
||||
return new AggregateAppResourceUsage(lastResourceSecondsMap);
|
||||
}
|
||||
|
||||
public ApplicationResourceUsageReport getResourceUsageReport() {
|
||||
|
@ -1032,6 +1034,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
Resource cluster = rmContext.getScheduler().getClusterResource();
|
||||
ResourceCalculator calc =
|
||||
rmContext.getScheduler().getResourceCalculator();
|
||||
Map<String, Long> preemptedResourceSecondsMaps = new HashMap<>();
|
||||
preemptedResourceSecondsMaps
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), 0L);
|
||||
preemptedResourceSecondsMaps
|
||||
.put(ResourceInformation.VCORES.getName(), 0L);
|
||||
float queueUsagePerc = 0.0f;
|
||||
float clusterUsagePerc = 0.0f;
|
||||
if (!calc.isInvalidDivisor(cluster)) {
|
||||
|
@ -1042,15 +1049,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
if (Float.isNaN(queueUsagePerc) || Float.isInfinite(queueUsagePerc)) {
|
||||
queueUsagePerc = 0.0f;
|
||||
}
|
||||
clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster)
|
||||
* 100;
|
||||
clusterUsagePerc =
|
||||
calc.divide(cluster, usedResourceClone, cluster) * 100;
|
||||
}
|
||||
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
|
||||
reservedContainers.size(), usedResourceClone, reservedResourceClone,
|
||||
Resources.add(usedResourceClone, reservedResourceClone),
|
||||
runningResourceUsage.getMemorySeconds(),
|
||||
runningResourceUsage.getVcoreSeconds(), queueUsagePerc,
|
||||
clusterUsagePerc, 0, 0);
|
||||
return ApplicationResourceUsageReport
|
||||
.newInstance(liveContainers.size(), reservedContainers.size(),
|
||||
usedResourceClone, reservedResourceClone,
|
||||
Resources.add(usedResourceClone, reservedResourceClone),
|
||||
runningResourceUsage.getResourceUsageSecondsMap(), queueUsagePerc,
|
||||
clusterUsagePerc, preemptedResourceSecondsMaps);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.AppBlock;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
|
||||
|
@ -106,15 +107,12 @@ public class RMAppBlock extends AppBlock{
|
|||
attemptResourcePreempted)
|
||||
._("Number of Non-AM Containers Preempted from Current Attempt:",
|
||||
attemptNumNonAMContainerPreempted)
|
||||
._("Aggregate Resource Allocation:",
|
||||
String.format("%d MB-seconds, %d vcore-seconds",
|
||||
appMetrics == null ? "N/A" : appMetrics.getMemorySeconds(),
|
||||
appMetrics == null ? "N/A" : appMetrics.getVcoreSeconds()))
|
||||
._("Aggregate Resource Allocation:", appMetrics == null ? "N/A" :
|
||||
StringHelper
|
||||
.getResourceSecondsString(appMetrics.getResourceSecondsMap()))
|
||||
._("Aggregate Preempted Resource Allocation:",
|
||||
String.format("%d MB-seconds, %d vcore-seconds",
|
||||
appMetrics == null ? "N/A" : appMetrics.getPreemptedMemorySeconds(),
|
||||
appMetrics == null ? "N/A" :
|
||||
appMetrics.getPreemptedVcoreSeconds()));
|
||||
appMetrics == null ? "N/A" : StringHelper.getResourceSecondsString(
|
||||
appMetrics.getPreemptedResourceSecondsMap()));
|
||||
|
||||
pdiv._();
|
||||
}
|
||||
|
|
|
@ -102,6 +102,7 @@ public class AppInfo {
|
|||
private long vcoreSeconds;
|
||||
protected float queueUsagePercentage;
|
||||
protected float clusterUsagePercentage;
|
||||
protected Map<String, Long> resourceSecondsMap;
|
||||
|
||||
// preemption info fields
|
||||
private long preemptedResourceMB;
|
||||
|
@ -110,6 +111,7 @@ public class AppInfo {
|
|||
private int numAMContainerPreempted;
|
||||
private long preemptedMemorySeconds;
|
||||
private long preemptedVcoreSeconds;
|
||||
protected Map<String, Long> preemptedResourceSecondsMap;
|
||||
|
||||
// list of resource requests
|
||||
@XmlElement(name = "resourceRequests")
|
||||
|
@ -236,8 +238,10 @@ public class AppInfo {
|
|||
appMetrics.getResourcePreempted().getVirtualCores();
|
||||
memorySeconds = appMetrics.getMemorySeconds();
|
||||
vcoreSeconds = appMetrics.getVcoreSeconds();
|
||||
resourceSecondsMap = appMetrics.getResourceSecondsMap();
|
||||
preemptedMemorySeconds = appMetrics.getPreemptedMemorySeconds();
|
||||
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
|
||||
preemptedResourceSecondsMap = appMetrics.getPreemptedResourceSecondsMap();
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
app.getApplicationSubmissionContext();
|
||||
unmanagedApplication = appSubmissionContext.getUnmanagedAM();
|
||||
|
@ -460,6 +464,22 @@ public class AppInfo {
|
|||
return this.reservedVCores;
|
||||
}
|
||||
|
||||
public long getPreemptedMB() {
|
||||
return preemptedResourceMB;
|
||||
}
|
||||
|
||||
public long getPreemptedVCores() {
|
||||
return preemptedResourceVCores;
|
||||
}
|
||||
|
||||
public int getNumNonAMContainersPreempted() {
|
||||
return numNonAMContainerPreempted;
|
||||
}
|
||||
|
||||
public int getNumAMContainersPreempted() {
|
||||
return numAMContainerPreempted;
|
||||
}
|
||||
|
||||
public long getMemorySeconds() {
|
||||
return memorySeconds;
|
||||
}
|
||||
|
@ -468,6 +488,10 @@ public class AppInfo {
|
|||
return vcoreSeconds;
|
||||
}
|
||||
|
||||
public Map<String, Long> getResourceSecondsMap() {
|
||||
return resourceSecondsMap;
|
||||
}
|
||||
|
||||
public long getPreemptedMemorySeconds() {
|
||||
return preemptedMemorySeconds;
|
||||
}
|
||||
|
@ -476,6 +500,10 @@ public class AppInfo {
|
|||
return preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
public Map<String, Long> getPreemptedResourceSecondsMap() {
|
||||
return preemptedResourceSecondsMap;
|
||||
}
|
||||
|
||||
public List<ResourceRequestInfo> getResourceRequests() {
|
||||
return this.resourceRequests;
|
||||
}
|
||||
|
|
|
@ -20,46 +20,68 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
|||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
@XmlAccessorType(XmlAccessType.NONE)
|
||||
public class ResourceInfo {
|
||||
|
||||
@XmlElement
|
||||
long memory;
|
||||
@XmlElement
|
||||
int vCores;
|
||||
|
||||
|
||||
private Resource resources;
|
||||
|
||||
public ResourceInfo() {
|
||||
}
|
||||
|
||||
public ResourceInfo(Resource res) {
|
||||
memory = res.getMemorySize();
|
||||
vCores = res.getVirtualCores();
|
||||
resources = Resources.clone(res);
|
||||
}
|
||||
|
||||
public long getMemorySize() {
|
||||
return memory;
|
||||
if (resources == null) {
|
||||
resources = Resource.newInstance(memory, vCores);
|
||||
}
|
||||
return resources.getMemorySize();
|
||||
}
|
||||
|
||||
public int getvCores() {
|
||||
return vCores;
|
||||
if (resources == null) {
|
||||
resources = Resource.newInstance(memory, vCores);
|
||||
}
|
||||
return resources.getVirtualCores();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "<memory:" + memory + ", vCores:" + vCores + ">";
|
||||
return resources.toString();
|
||||
}
|
||||
|
||||
public void setMemory(int memory) {
|
||||
if (resources == null) {
|
||||
resources = Resource.newInstance(memory, vCores);
|
||||
}
|
||||
this.memory = memory;
|
||||
resources.setMemorySize(memory);
|
||||
}
|
||||
|
||||
public void setvCores(int vCores) {
|
||||
if (resources == null) {
|
||||
resources = Resource.newInstance(memory, vCores);
|
||||
}
|
||||
this.vCores = vCores;
|
||||
resources.setVirtualCores(vCores);
|
||||
}
|
||||
|
||||
public Resource getResource() {
|
||||
return Resource.newInstance(memory, vCores);
|
||||
return Resource.newInstance(resources);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ public class SchedulerInfo {
|
|||
}
|
||||
|
||||
public String getSchedulerResourceTypes() {
|
||||
return this.schedulingResourceTypes.toString();
|
||||
return minAllocResource.getResource().getResources().keySet().toString();
|
||||
}
|
||||
|
||||
public int getMaxClusterLevelAppPriority() {
|
||||
|
|
|
@ -88,6 +88,8 @@ message ApplicationAttemptStateDataProto {
|
|||
optional int64 finish_time = 12;
|
||||
optional int64 preempted_memory_seconds = 13;
|
||||
optional int64 preempted_vcore_seconds = 14;
|
||||
repeated StringLongMapProto application_resource_usage_map = 15;
|
||||
repeated StringLongMapProto preempted_resource_usage_map = 16;
|
||||
}
|
||||
|
||||
message EpochProto {
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
|
@ -828,9 +830,12 @@ public class TestAppManager{
|
|||
when(app.getApplicationType()).thenReturn("MAPREDUCE");
|
||||
when(app.getSubmitTime()).thenReturn(1000L);
|
||||
when(app.getLaunchTime()).thenReturn(2000L);
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
resourceSecondsMap.put(ResourceInformation.MEMORY_MB.getName(), 16384L);
|
||||
resourceSecondsMap.put(ResourceInformation.VCORES.getName(), 64L);
|
||||
RMAppMetrics metrics =
|
||||
new RMAppMetrics(Resource.newInstance(1234, 56),
|
||||
10, 1, 16384, 64, 0, 0);
|
||||
10, 1, resourceSecondsMap, new HashMap<String, Long>());
|
||||
when(app.getRMAppMetrics()).thenReturn(metrics);
|
||||
|
||||
RMAppManager.ApplicationSummary.SummaryBuilder summary =
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
|
@ -424,6 +426,9 @@ public class TestContainerResourceUsage {
|
|||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
long vcoreSeconds = resource.getVirtualCores()
|
||||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
return new AggregateAppResourceUsage(memorySeconds, vcoreSeconds);
|
||||
Map<String, Long> map = new HashMap<>();
|
||||
map.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds);
|
||||
map.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
|
||||
return new AggregateAppResourceUsage(map);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -194,7 +195,8 @@ public abstract class MockAsm extends MockApps {
|
|||
|
||||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<String, Long>(),
|
||||
new HashMap<String, Long>());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -354,8 +356,9 @@ public abstract class MockAsm extends MockApps {
|
|||
public ApplicationReport createAndGetApplicationReport(
|
||||
String clientUserName, boolean allowAccess) {
|
||||
ApplicationResourceUsageReport usageReport =
|
||||
ApplicationResourceUsageReport.newInstance(0, 0, null, null, null,
|
||||
0, 0, 0, 0, 0, 0);
|
||||
ApplicationResourceUsageReport
|
||||
.newInstance(0, 0, null, null, null, new HashMap<String, Long>(), 0, 0,
|
||||
new HashMap<String, Long>());
|
||||
ApplicationReport report = ApplicationReport.newInstance(
|
||||
getApplicationId(), appAttemptId, getUser(), getQueue(),
|
||||
getName(), null, 0, null, null, getDiagnostics().toString(),
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
|
@ -505,9 +507,16 @@ public class TestSystemMetricsPublisher {
|
|||
when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
|
||||
when(app.getFinalApplicationStatus()).thenReturn(
|
||||
FinalApplicationStatus.UNDEFINED);
|
||||
when(app.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
|
||||
Integer.MAX_VALUE, Long.MAX_VALUE));
|
||||
Map<String, Long> resourceMap = new HashMap<>();
|
||||
resourceMap
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
|
||||
resourceMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
|
||||
Map<String, Long> preemptedMap = new HashMap<>();
|
||||
preemptedMap
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
|
||||
preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
|
||||
when(app.getRMAppMetrics())
|
||||
.thenReturn(new RMAppMetrics(null, 0, 0, resourceMap, preemptedMap));
|
||||
Set<String> appTags = new HashSet<String>();
|
||||
appTags.add("test");
|
||||
appTags.add("tags");
|
||||
|
|
|
@ -29,6 +29,8 @@ import java.io.File;
|
|||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
|
@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
|
@ -358,15 +361,20 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
when(app.getDiagnostics()).thenReturn(
|
||||
new StringBuilder("test diagnostics info"));
|
||||
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
|
||||
when(appAttempt.getAppAttemptId()).thenReturn(
|
||||
ApplicationAttemptId.newInstance(appId, 1));
|
||||
when(appAttempt.getAppAttemptId())
|
||||
.thenReturn(ApplicationAttemptId.newInstance(appId, 1));
|
||||
when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
|
||||
when(app.getFinalApplicationStatus()).thenReturn(
|
||||
FinalApplicationStatus.UNDEFINED);
|
||||
when(app.getFinalApplicationStatus())
|
||||
.thenReturn(FinalApplicationStatus.UNDEFINED);
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
|
||||
when(app.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, Integer.MAX_VALUE,
|
||||
Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE));
|
||||
when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceSecondsMap,
|
||||
new HashMap<String, Long>()));
|
||||
when(app.getApplicationTags()).thenReturn(Collections.<String>emptySet());
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(appSubmissionContext.getPriority())
|
||||
|
|
|
@ -194,7 +194,7 @@ public class RMStateStoreTestBase {
|
|||
when(mockAttempt.getRMAppAttemptMetrics())
|
||||
.thenReturn(mockRmAppAttemptMetrics);
|
||||
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
|
||||
.thenReturn(new AggregateAppResourceUsage(0, 0));
|
||||
.thenReturn(new AggregateAppResourceUsage(new HashMap<String, Long>()));
|
||||
dispatcher.attemptId = attemptId;
|
||||
store.storeNewApplicationAttempt(mockAttempt);
|
||||
waitNotify(dispatcher);
|
||||
|
@ -292,7 +292,7 @@ public class RMStateStoreTestBase {
|
|||
when(mockRemovedAttempt.getRMAppAttemptMetrics())
|
||||
.thenReturn(mockRmAppAttemptMetrics);
|
||||
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
|
||||
.thenReturn(new AggregateAppResourceUsage(0,0));
|
||||
.thenReturn(new AggregateAppResourceUsage(new HashMap<String, Long>()));
|
||||
attempts.put(attemptIdRemoved, mockRemovedAttempt);
|
||||
store.removeApplication(mockRemovedApp);
|
||||
|
||||
|
@ -369,7 +369,7 @@ public class RMStateStoreTestBase {
|
|||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 100,
|
||||
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
|
||||
oldAttemptState.getFinishTime(), new HashMap<String, Long>(), new HashMap<String, Long>());
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
|
||||
// test updating the state of an app/attempt whose initial state was not
|
||||
|
@ -393,7 +393,7 @@ public class RMStateStoreTestBase {
|
|||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 111,
|
||||
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
|
||||
oldAttemptState.getFinishTime(), new HashMap<String, Long>(), new HashMap<String, Long>());
|
||||
store.updateApplicationAttemptState(dummyAttempt);
|
||||
|
||||
// let things settle down
|
||||
|
|
|
@ -34,12 +34,7 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.*;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
|
@ -554,7 +549,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
when(mockAttempt.getRMAppAttemptMetrics())
|
||||
.thenReturn(mockRmAppAttemptMetrics);
|
||||
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
|
||||
.thenReturn(new AggregateAppResourceUsage(0,0));
|
||||
.thenReturn(new AggregateAppResourceUsage(new HashMap<String, Long>()));
|
||||
store.storeNewApplicationAttempt(mockAttempt);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
@ -566,7 +561,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
store.getCredentialsFromAppAttempt(mockAttempt),
|
||||
startTime, RMAppAttemptState.FINISHED, "testUrl",
|
||||
"test", FinalApplicationStatus.SUCCEEDED, 100,
|
||||
finishTime, 0, 0, 0, 0);
|
||||
finishTime, new HashMap<String, Long>(), new HashMap<String, Long>());
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
@ -803,10 +798,20 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
private static ApplicationAttemptStateData createFinishedAttempt(
|
||||
ApplicationAttemptId attemptId, Container container, long startTime,
|
||||
int amExitStatus) {
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), 0L);
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), 0L);
|
||||
preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(),
|
||||
0L);
|
||||
preemptedResoureSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), 0L);
|
||||
return ApplicationAttemptStateData.newInstance(attemptId,
|
||||
container, null, startTime, RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
|
||||
amExitStatus, 0, 0, 0, 0, 0);
|
||||
amExitStatus, 0, resourceSecondsMap, preemptedResoureSecondsMap);
|
||||
}
|
||||
|
||||
private ApplicationAttemptId storeAttempt(RMStateStore store,
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -62,9 +63,10 @@ public class TestAppPage {
|
|||
when(app.getStartTime()).thenReturn(0L);
|
||||
when(app.getFinishTime()).thenReturn(0L);
|
||||
when(app.createApplicationState()).thenReturn(YarnApplicationState.FAILED);
|
||||
|
||||
RMAppMetrics appMetrics = new RMAppMetrics(
|
||||
Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
|
||||
|
||||
RMAppMetrics appMetrics =
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, new HashMap<String, Long>(),
|
||||
new HashMap<String, Long>());
|
||||
when(app.getRMAppMetrics()).thenReturn(appMetrics);
|
||||
|
||||
// initialize RM Context, and create RMApp, without creating RMAppAttempt
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.junit.Test;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
|
@ -136,8 +137,8 @@ public class TestRMWebAppFairScheduler {
|
|||
MockRMApp app = new MockRMApp(i, i, state) {
|
||||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0),
|
||||
0, 0, 0, 0, 0, 0);
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0,
|
||||
new HashMap<String, Long>(), new HashMap<String, Long>());
|
||||
}
|
||||
@Override
|
||||
public YarnApplicationState createApplicationState() {
|
||||
|
|
Loading…
Reference in New Issue