YARN-4218. Metric for resource*time that was preempted. Contributed by Chang Li.
(cherry picked from commit dd5b9dabf9
)
This commit is contained in:
parent
d81706cd99
commit
1a6a5af44a
|
@ -36,7 +36,8 @@ public abstract class ApplicationResourceUsageReport {
|
|||
public static ApplicationResourceUsageReport newInstance(
|
||||
int numUsedContainers, int numReservedContainers, Resource usedResources,
|
||||
Resource reservedResources, Resource neededResources, long memorySeconds,
|
||||
long vcoreSeconds, float queueUsagePerc, float clusterUsagePerc) {
|
||||
long vcoreSeconds, float queueUsagePerc, float clusterUsagePerc,
|
||||
long preemptedMemorySeconds, long preemptedVcoresSeconds) {
|
||||
ApplicationResourceUsageReport report =
|
||||
Records.newRecord(ApplicationResourceUsageReport.class);
|
||||
report.setNumUsedContainers(numUsedContainers);
|
||||
|
@ -48,6 +49,8 @@ public abstract class ApplicationResourceUsageReport {
|
|||
report.setVcoreSeconds(vcoreSeconds);
|
||||
report.setQueueUsagePercentage(queueUsagePerc);
|
||||
report.setClusterUsagePercentage(clusterUsagePerc);
|
||||
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
report.setPreemptedVcoreSeconds(preemptedVcoresSeconds);
|
||||
return report;
|
||||
}
|
||||
|
||||
|
@ -188,4 +191,42 @@ public abstract class ApplicationResourceUsageReport {
|
|||
@Private
|
||||
@Unstable
|
||||
public abstract void setClusterUsagePercentage(float clusterUsagePerc);
|
||||
|
||||
/**
|
||||
* Set the aggregated amount of memory preempted (in megabytes)
|
||||
* the application has allocated times the number of seconds
|
||||
* the application has been running.
|
||||
* @param memorySeconds the aggregated amount of memory seconds
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setPreemptedMemorySeconds(long memorySeconds);
|
||||
|
||||
/**
|
||||
* Get the aggregated amount of memory preempted(in megabytes)
|
||||
* the application has allocated times the number of
|
||||
* seconds the application has been running.
|
||||
* @return the aggregated amount of memory seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedMemorySeconds();
|
||||
|
||||
/**
|
||||
* Set the aggregated number of vcores preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @param vcoreSeconds the aggregated number of vcore seconds
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setPreemptedVcoreSeconds(long vcoreSeconds);
|
||||
|
||||
/**
|
||||
* Get the aggregated number of vcores preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @return the aggregated number of vcore seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedVcoreSeconds();
|
||||
}
|
||||
|
|
|
@ -181,6 +181,8 @@ message ApplicationResourceUsageReportProto {
|
|||
optional int64 vcore_seconds = 7;
|
||||
optional float queue_usage_percentage = 8;
|
||||
optional float cluster_usage_percentage = 9;
|
||||
optional int64 preempted_memory_seconds = 10;
|
||||
optional int64 preempted_vcore_seconds = 11;
|
||||
}
|
||||
|
||||
message ApplicationReportProto {
|
||||
|
|
|
@ -636,8 +636,15 @@ public class ApplicationCLI extends YarnCLI {
|
|||
//completed app report in the timeline server doesn't have usage report
|
||||
appReportStr.print(usageReport.getMemorySeconds() + " MB-seconds, ");
|
||||
appReportStr.println(usageReport.getVcoreSeconds() + " vcore-seconds");
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.print(usageReport.getPreemptedMemorySeconds() +
|
||||
" MB-seconds, ");
|
||||
appReportStr.println(usageReport.getPreemptedVcoreSeconds() +
|
||||
" vcore-seconds");
|
||||
} else {
|
||||
appReportStr.println("N/A");
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.println("N/A");
|
||||
}
|
||||
appReportStr.print("\tLog Aggregation Status : ");
|
||||
appReportStr.println(appReport.getLogAggregationStatus() == null ? "N/A"
|
||||
|
|
|
@ -112,7 +112,7 @@ public class TestYarnCLI {
|
|||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||
ApplicationResourceUsageReport usageReport = i == 0 ? null :
|
||||
ApplicationResourceUsageReport.newInstance(
|
||||
2, 0, null, null, null, 123456, 4567, 0, 0);
|
||||
2, 0, null, null, null, 123456, 4567, 0, 0, 1111, 2222);
|
||||
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
|
||||
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
|
@ -145,6 +145,8 @@ public class TestYarnCLI {
|
|||
pw.println("\tAM Host : host");
|
||||
pw.println("\tAggregate Resource Allocation : " +
|
||||
(i == 0 ? "N/A" : "123456 MB-seconds, 4567 vcore-seconds"));
|
||||
pw.println("\tAggregate Resource Preempted : " +
|
||||
(i == 0 ? "N/A" : "1111 MB-seconds, 2222 vcore-seconds"));
|
||||
pw.println("\tLog Aggregation Status : SUCCEEDED");
|
||||
pw.println("\tDiagnostics : diagnostics");
|
||||
pw.println("\tUnmanaged Application : false");
|
||||
|
|
|
@ -224,6 +224,34 @@ extends ApplicationResourceUsageReport {
|
|||
return (p.getVcoreSeconds());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPreemptedMemorySeconds(
|
||||
long preemptedMemorySeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getPreemptedMemorySeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getPreemptedMemorySeconds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPreemptedVcoreSeconds(
|
||||
long vcoreSeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedVcoreSeconds(vcoreSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getPreemptedVcoreSeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return (p.getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
|
|
@ -334,9 +334,15 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
ApplicationMetricsConstants.APP_CPU_METRICS).toString());
|
||||
long memorySeconds=Long.parseLong(entityInfo.get(
|
||||
ApplicationMetricsConstants.APP_MEM_METRICS).toString());
|
||||
long preemptedMemorySeconds = Long.parseLong(entityInfo.get(
|
||||
ApplicationMetricsConstants
|
||||
.APP_MEM_PREEMPT_METRICS).toString());
|
||||
long preemptedVcoreSeconds = Long.parseLong(entityInfo.get(
|
||||
ApplicationMetricsConstants
|
||||
.APP_CPU_PREEMPT_METRICS).toString());
|
||||
appResources = ApplicationResourceUsageReport
|
||||
.newInstance(0, 0, null, null, null, memorySeconds, vcoreSeconds, 0,
|
||||
0);
|
||||
0, preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
}
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {
|
||||
appTags = new HashSet<String>();
|
||||
|
|
|
@ -233,6 +233,11 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
applicationResourceUsageReport.getMemorySeconds());
|
||||
Assert
|
||||
.assertEquals(345, applicationResourceUsageReport.getVcoreSeconds());
|
||||
Assert.assertEquals(456,
|
||||
applicationResourceUsageReport.getPreemptedMemorySeconds());
|
||||
Assert
|
||||
.assertEquals(789, applicationResourceUsageReport
|
||||
.getPreemptedVcoreSeconds());
|
||||
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
|
||||
app.getFinalApplicationStatus());
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED,
|
||||
|
@ -490,8 +495,10 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
Priority.newInstance(0));
|
||||
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||
Integer.MAX_VALUE + 1L);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS,123);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS,345);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS, 123);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS, 345);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS,456);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS,789);
|
||||
if (emptyACLs) {
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, "");
|
||||
} else {
|
||||
|
|
|
@ -76,6 +76,12 @@ public class ApplicationMetricsConstants {
|
|||
public static final String APP_MEM_METRICS =
|
||||
"YARN_APPLICATION_MEM_METRIC";
|
||||
|
||||
public static final String APP_CPU_PREEMPT_METRICS =
|
||||
"YARN_APPLICATION_CPU_PREEMPT_METRIC";
|
||||
|
||||
public static final String APP_MEM_PREEMPT_METRICS =
|
||||
"YARN_APPLICATION_MEM_PREEMPT_METRIC";
|
||||
|
||||
public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
|
||||
"YARN_APPLICATION_LATEST_APP_ATTEMPT";
|
||||
|
||||
|
|
|
@ -415,7 +415,8 @@ public class BuilderUtils {
|
|||
public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
|
||||
int numUsedContainers, int numReservedContainers, Resource usedResources,
|
||||
Resource reservedResources, Resource neededResources, long memorySeconds,
|
||||
long vcoreSeconds) {
|
||||
long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
ApplicationResourceUsageReport report =
|
||||
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
|
||||
report.setNumUsedContainers(numUsedContainers);
|
||||
|
@ -425,6 +426,8 @@ public class BuilderUtils {
|
|||
report.setNeededResources(neededResources);
|
||||
report.setMemorySeconds(memorySeconds);
|
||||
report.setVcoreSeconds(vcoreSeconds);
|
||||
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
report.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
|
||||
return report;
|
||||
}
|
||||
|
||||
|
|
|
@ -170,6 +170,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
.add("finalStatus", app.getFinalApplicationStatus())
|
||||
.add("memorySeconds", metrics.getMemorySeconds())
|
||||
.add("vcoreSeconds", metrics.getVcoreSeconds())
|
||||
.add("preemptedMemorySeconds", metrics.getPreemptedMemorySeconds())
|
||||
.add("preemptedVcoreSeconds", metrics.getPreemptedVcoreSeconds())
|
||||
.add("preemptedAMContainers", metrics.getNumAMContainersPreempted())
|
||||
.add("preemptedNonAMContainers", metrics.getNumNonAMContainersPreempted())
|
||||
.add("preemptedResources", metrics.getResourcePreempted())
|
||||
|
|
|
@ -447,7 +447,7 @@ public class RMServerUtils {
|
|||
DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
|
||||
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
|
||||
Resources.createResource(-1, -1), Resources.createResource(-1, -1),
|
||||
Resources.createResource(-1, -1), 0, 0);
|
||||
Resources.createResource(-1, -1), 0, 0, 0, 0);
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -357,7 +357,11 @@ public class SystemMetricsPublisher extends CompositeService {
|
|||
appMetrics.getVcoreSeconds());
|
||||
entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
|
||||
appMetrics.getMemorySeconds());
|
||||
|
||||
entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS,
|
||||
appMetrics.getPreemptedMemorySeconds());
|
||||
entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS,
|
||||
appMetrics.getPreemptedVcoreSeconds());
|
||||
|
||||
tEvent.setEventInfo(eventInfo);
|
||||
entity.addEvent(tEvent);
|
||||
putEntity(entity);
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
|
@ -782,16 +783,19 @@ public abstract class RMStateStore extends AbstractService {
|
|||
*/
|
||||
public void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
|
||||
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
|
||||
|
||||
RMAppAttemptMetrics attempMetrics = appAttempt.getRMAppAttemptMetrics();
|
||||
AggregateAppResourceUsage resUsage =
|
||||
appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
|
||||
attempMetrics.getAggregateAppResourceUsage();
|
||||
ApplicationAttemptStateData attemptState =
|
||||
ApplicationAttemptStateData.newInstance(
|
||||
appAttempt.getAppAttemptId(),
|
||||
appAttempt.getMasterContainer(),
|
||||
credentials, appAttempt.getStartTime(),
|
||||
resUsage.getMemorySeconds(),
|
||||
resUsage.getVcoreSeconds());
|
||||
resUsage.getVcoreSeconds(),
|
||||
attempMetrics.getPreemptedMemory(),
|
||||
attempMetrics.getPreemptedVcore()
|
||||
);
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new RMStateStoreAppAttemptEvent(attemptState));
|
||||
|
|
|
@ -40,7 +40,8 @@ public abstract class ApplicationAttemptStateData {
|
|||
Credentials attemptTokens, long startTime, RMAppAttemptState finalState,
|
||||
String finalTrackingUrl, String diagnostics,
|
||||
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
|
||||
long finishTime, long memorySeconds, long vcoreSeconds) {
|
||||
long finishTime, long memorySeconds, long vcoreSeconds,
|
||||
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
|
||||
ApplicationAttemptStateData attemptStateData =
|
||||
Records.newRecord(ApplicationAttemptStateData.class);
|
||||
attemptStateData.setAttemptId(attemptId);
|
||||
|
@ -55,16 +56,20 @@ public abstract class ApplicationAttemptStateData {
|
|||
attemptStateData.setFinishTime(finishTime);
|
||||
attemptStateData.setMemorySeconds(memorySeconds);
|
||||
attemptStateData.setVcoreSeconds(vcoreSeconds);
|
||||
attemptStateData.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
attemptStateData.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
|
||||
return attemptStateData;
|
||||
}
|
||||
|
||||
public static ApplicationAttemptStateData newInstance(
|
||||
ApplicationAttemptId attemptId, Container masterContainer,
|
||||
Credentials attemptTokens, long startTime, long memorySeconds,
|
||||
long vcoreSeconds) {
|
||||
long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
return newInstance(attemptId, masterContainer, attemptTokens,
|
||||
startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
|
||||
memorySeconds, vcoreSeconds);
|
||||
memorySeconds, vcoreSeconds,
|
||||
preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
}
|
||||
|
||||
|
||||
|
@ -182,4 +187,32 @@ public abstract class ApplicationAttemptStateData {
|
|||
@Public
|
||||
@Unstable
|
||||
public abstract void setVcoreSeconds(long vcoreSeconds);
|
||||
|
||||
/**
|
||||
* Get the <em>preempted memory seconds</em>
|
||||
* (in MB seconds) of the application.
|
||||
* @return <em>preempted memory seconds</em>
|
||||
* (in MB seconds) of the application
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedMemorySeconds();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setPreemptedMemorySeconds(long memorySeconds);
|
||||
|
||||
/**
|
||||
* Get the <em>preempted vcore seconds</em>
|
||||
* of the application.
|
||||
* @return <em>preempted vcore seconds</em>
|
||||
* of the application
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedVcoreSeconds();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setPreemptedVcoreSeconds(long vcoreSeconds);
|
||||
}
|
||||
|
|
|
@ -262,6 +262,30 @@ public class ApplicationAttemptStateDataPBImpl extends
|
|||
builder.setVcoreSeconds(vcoreSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreemptedMemorySeconds() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getPreemptedMemorySeconds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreemptedVcoreSeconds() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getPreemptedVcoreSeconds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPreemptedMemorySeconds(long memorySeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedMemorySeconds(memorySeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPreemptedVcoreSeconds(long vcoreSeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedVcoreSeconds(vcoreSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinalApplicationStatus getFinalApplicationStatus() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
|
|
@ -679,6 +679,12 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppMetrics rmAppMetrics = getRMAppMetrics();
|
||||
appUsageReport.setMemorySeconds(rmAppMetrics.getMemorySeconds());
|
||||
appUsageReport.setVcoreSeconds(rmAppMetrics.getVcoreSeconds());
|
||||
appUsageReport.
|
||||
setPreemptedMemorySeconds(rmAppMetrics.
|
||||
getPreemptedMemorySeconds());
|
||||
appUsageReport.
|
||||
setPreemptedVcoreSeconds(rmAppMetrics.
|
||||
getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
if (currentApplicationAttemptId == null) {
|
||||
|
@ -1449,6 +1455,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
int numNonAMContainerPreempted = 0;
|
||||
long memorySeconds = 0;
|
||||
long vcoreSeconds = 0;
|
||||
long preemptedMemorySeconds = 0;
|
||||
long preemptedVcoreSeconds = 0;
|
||||
for (RMAppAttempt attempt : attempts.values()) {
|
||||
if (null != attempt) {
|
||||
RMAppAttemptMetrics attemptMetrics =
|
||||
|
@ -1464,12 +1472,15 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
|
||||
memorySeconds += resUsage.getMemorySeconds();
|
||||
vcoreSeconds += resUsage.getVcoreSeconds();
|
||||
preemptedMemorySeconds += attemptMetrics.getPreemptedMemory();
|
||||
preemptedVcoreSeconds += attemptMetrics.getPreemptedVcore();
|
||||
}
|
||||
}
|
||||
|
||||
return new RMAppMetrics(resourcePreempted,
|
||||
numNonAMContainerPreempted, numAMContainerPreempted,
|
||||
memorySeconds, vcoreSeconds);
|
||||
memorySeconds, vcoreSeconds,
|
||||
preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
}
|
||||
|
||||
@Private
|
||||
|
|
|
@ -26,15 +26,20 @@ public class RMAppMetrics {
|
|||
final int numAMContainersPreempted;
|
||||
final long memorySeconds;
|
||||
final long vcoreSeconds;
|
||||
private final long preemptedMemorySeconds;
|
||||
private final long preemptedVcoreSeconds;
|
||||
|
||||
public RMAppMetrics(Resource resourcePreempted,
|
||||
int numNonAMContainersPreempted, int numAMContainersPreempted,
|
||||
long memorySeconds, long vcoreSeconds) {
|
||||
long memorySeconds, long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
this.resourcePreempted = resourcePreempted;
|
||||
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
|
||||
this.numAMContainersPreempted = numAMContainersPreempted;
|
||||
this.memorySeconds = memorySeconds;
|
||||
this.vcoreSeconds = vcoreSeconds;
|
||||
this.preemptedMemorySeconds = preemptedMemorySeconds;
|
||||
this.preemptedVcoreSeconds = preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
public Resource getResourcePreempted() {
|
||||
|
@ -56,4 +61,13 @@ public class RMAppMetrics {
|
|||
public long getVcoreSeconds() {
|
||||
return vcoreSeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedMemorySeconds() {
|
||||
return preemptedMemorySeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedVcoreSeconds() {
|
||||
return preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -890,6 +890,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.attemptMetrics.getAggregateAppResourceUsage();
|
||||
report.setMemorySeconds(resUsage.getMemorySeconds());
|
||||
report.setVcoreSeconds(resUsage.getVcoreSeconds());
|
||||
report.setPreemptedMemorySeconds(
|
||||
this.attemptMetrics.getPreemptedMemory());
|
||||
report.setPreemptedVcoreSeconds(
|
||||
this.attemptMetrics.getPreemptedVcore());
|
||||
return report;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
|
@ -922,6 +926,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.finishTime = attemptState.getFinishTime();
|
||||
this.attemptMetrics.updateAggregateAppResourceUsage(
|
||||
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
|
||||
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
|
||||
attemptState.getPreemptedMemorySeconds(),
|
||||
attemptState.getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
public void transferStateFromAttempt(RMAppAttempt attempt) {
|
||||
|
@ -1298,7 +1305,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
startTime, stateToBeStored, finalTrackingUrl, diags,
|
||||
finalStatus, exitStatus,
|
||||
getFinishTime(), resUsage.getMemorySeconds(),
|
||||
resUsage.getVcoreSeconds());
|
||||
resUsage.getVcoreSeconds(),
|
||||
this.attemptMetrics.getPreemptedMemory(),
|
||||
this.attemptMetrics.getPreemptedVcore());
|
||||
LOG.info("Updating application attempt " + applicationAttemptId
|
||||
+ " with final state: " + targetedFinalState + ", and exit status: "
|
||||
+ exitStatus);
|
||||
|
|
|
@ -50,6 +50,8 @@ public class RMAppAttemptMetrics {
|
|||
private WriteLock writeLock;
|
||||
private AtomicLong finishedMemorySeconds = new AtomicLong(0);
|
||||
private AtomicLong finishedVcoreSeconds = new AtomicLong(0);
|
||||
private AtomicLong preemptedMemorySeconds = new AtomicLong(0);
|
||||
private AtomicLong preemptedVcoreSeconds = new AtomicLong(0);
|
||||
private RMContext rmContext;
|
||||
|
||||
private int[][] localityStatistics =
|
||||
|
@ -98,6 +100,14 @@ public class RMAppAttemptMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
public long getPreemptedMemory() {
|
||||
return preemptedMemorySeconds.get();
|
||||
}
|
||||
|
||||
public long getPreemptedVcore() {
|
||||
return preemptedVcoreSeconds.get();
|
||||
}
|
||||
|
||||
public int getNumNonAMContainersPreempted() {
|
||||
return numNonAMContainersPreempted.get();
|
||||
}
|
||||
|
@ -134,6 +144,12 @@ public class RMAppAttemptMetrics {
|
|||
this.finishedVcoreSeconds.addAndGet(finishedVcoreSeconds);
|
||||
}
|
||||
|
||||
public void updateAggregatePreemptedAppResourceUsage(
|
||||
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
|
||||
this.preemptedMemorySeconds.addAndGet(preemptedMemorySeconds);
|
||||
this.preemptedVcoreSeconds.addAndGet(preemptedVcoreSeconds);
|
||||
}
|
||||
|
||||
public void incNumAllocatedContainers(NodeType containerType,
|
||||
NodeType requestType) {
|
||||
localityStatistics[containerType.index][requestType.index]++;
|
||||
|
|
|
@ -701,12 +701,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
|||
.getCurrentAppAttempt();
|
||||
|
||||
if (rmAttempt != null) {
|
||||
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
|
||||
.getExitStatus()) {
|
||||
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
|
||||
container);
|
||||
}
|
||||
|
||||
long usedMillis = container.finishTime - container.creationTime;
|
||||
long memorySeconds = resource.getMemorySize()
|
||||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
|
@ -714,6 +708,15 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
|||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
rmAttempt.getRMAppAttemptMetrics()
|
||||
.updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds);
|
||||
// If this is a preempted container, update preemption metrics
|
||||
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
|
||||
.getExitStatus()) {
|
||||
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
|
||||
container);
|
||||
rmAttempt.getRMAppAttemptMetrics()
|
||||
.updateAggregatePreemptedAppResourceUsage(memorySeconds,
|
||||
vcoreSeconds);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -718,7 +718,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
Resources.add(usedResourceClone, reservedResourceClone),
|
||||
runningResourceUsage.getMemorySeconds(),
|
||||
runningResourceUsage.getVcoreSeconds(), queueUsagePerc,
|
||||
clusterUsagePerc);
|
||||
clusterUsagePerc, 0, 0);
|
||||
}
|
||||
|
||||
public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
|
||||
|
@ -1000,4 +1000,4 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
return diagnosticMessage;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,7 +101,13 @@ public class RMAppBlock extends AppBlock{
|
|||
._("Aggregate Resource Allocation:",
|
||||
String.format("%d MB-seconds, %d vcore-seconds",
|
||||
appMetrics == null ? "N/A" : appMetrics.getMemorySeconds(),
|
||||
appMetrics == null ? "N/A" : appMetrics.getVcoreSeconds()));
|
||||
appMetrics == null ? "N/A" : appMetrics.getVcoreSeconds()))
|
||||
._("Aggregate Preempted Resource Allocation:",
|
||||
String.format("%d MB-seconds, %d vcore-seconds",
|
||||
appMetrics == null ? "N/A" : appMetrics.getPreemptedMemorySeconds(),
|
||||
appMetrics == null ? "N/A" :
|
||||
appMetrics.getPreemptedVcoreSeconds()));
|
||||
|
||||
pdiv._();
|
||||
}
|
||||
|
||||
|
|
|
@ -98,6 +98,8 @@ public class AppInfo {
|
|||
protected long preemptedResourceVCores;
|
||||
protected int numNonAMContainerPreempted;
|
||||
protected int numAMContainerPreempted;
|
||||
private long preemptedMemorySeconds;
|
||||
private long preemptedVcoreSeconds;
|
||||
|
||||
protected List<ResourceRequest> resourceRequests;
|
||||
|
||||
|
@ -203,6 +205,8 @@ public class AppInfo {
|
|||
appMetrics.getResourcePreempted().getVirtualCores();
|
||||
memorySeconds = appMetrics.getMemorySeconds();
|
||||
vcoreSeconds = appMetrics.getVcoreSeconds();
|
||||
preemptedMemorySeconds = appMetrics.getPreemptedMemorySeconds();
|
||||
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
|
||||
unmanagedApplication =
|
||||
appSubmissionContext.getUnmanagedAM();
|
||||
appNodeLabelExpression =
|
||||
|
@ -369,6 +373,14 @@ public class AppInfo {
|
|||
return vcoreSeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedMemorySeconds() {
|
||||
return preemptedMemorySeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedVcoreSeconds() {
|
||||
return preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
public List<ResourceRequest> getResourceRequests() {
|
||||
return this.resourceRequests;
|
||||
}
|
||||
|
|
|
@ -84,6 +84,8 @@ message ApplicationAttemptStateDataProto {
|
|||
optional int64 memory_seconds = 10;
|
||||
optional int64 vcore_seconds = 11;
|
||||
optional int64 finish_time = 12;
|
||||
optional int64 preempted_memory_seconds = 13;
|
||||
optional int64 preempted_vcore_seconds = 14;
|
||||
}
|
||||
|
||||
message EpochProto {
|
||||
|
|
|
@ -640,7 +640,8 @@ public class TestAppManager{
|
|||
when(app.getState()).thenReturn(RMAppState.RUNNING);
|
||||
when(app.getApplicationType()).thenReturn("MAPREDUCE");
|
||||
RMAppMetrics metrics =
|
||||
new RMAppMetrics(Resource.newInstance(1234, 56), 10, 1, 16384, 64);
|
||||
new RMAppMetrics(Resource.newInstance(1234, 56),
|
||||
10, 1, 16384, 64, 0, 0);
|
||||
when(app.getRMAppMetrics()).thenReturn(metrics);
|
||||
|
||||
RMAppManager.ApplicationSummary.SummaryBuilder summary =
|
||||
|
|
|
@ -181,7 +181,7 @@ public abstract class MockAsm extends MockApps {
|
|||
|
||||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -310,7 +310,7 @@ public abstract class MockAsm extends MockApps {
|
|||
String clientUserName, boolean allowAccess) {
|
||||
ApplicationResourceUsageReport usageReport =
|
||||
ApplicationResourceUsageReport.newInstance(0, 0, null, null, null,
|
||||
0, 0, 0, 0);
|
||||
0, 0, 0, 0, 0, 0);
|
||||
ApplicationReport report = ApplicationReport.newInstance(
|
||||
getApplicationId(), appAttemptId, getUser(), getQueue(),
|
||||
getName(), null, 0, null, null, getDiagnostics().toString(),
|
||||
|
|
|
@ -222,6 +222,16 @@ public class TestSystemMetricsPublisher {
|
|||
app.getRMAppMetrics().getVcoreSeconds(),
|
||||
Long.parseLong(entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.APP_CPU_METRICS).toString()));
|
||||
Assert.assertEquals(
|
||||
app.getRMAppMetrics().getPreemptedMemorySeconds(),
|
||||
Long.parseLong(entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS)
|
||||
.toString()));
|
||||
Assert.assertEquals(
|
||||
app.getRMAppMetrics().getPreemptedVcoreSeconds(),
|
||||
Long.parseLong(entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS)
|
||||
.toString()));
|
||||
}
|
||||
Assert.assertEquals("context", entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT));
|
||||
|
@ -495,7 +505,8 @@ public class TestSystemMetricsPublisher {
|
|||
when(app.getFinalApplicationStatus()).thenReturn(
|
||||
FinalApplicationStatus.UNDEFINED);
|
||||
when(app.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
|
||||
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
|
||||
Integer.MAX_VALUE, Long.MAX_VALUE));
|
||||
Set<String> appTags = new HashSet<String>();
|
||||
appTags.add("test");
|
||||
appTags.add("tags");
|
||||
|
|
|
@ -357,7 +357,7 @@ public class RMStateStoreTestBase {
|
|||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 100,
|
||||
oldAttemptState.getFinishTime(), 0, 0);
|
||||
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
|
||||
// test updating the state of an app/attempt whose initial state was not
|
||||
|
@ -381,7 +381,7 @@ public class RMStateStoreTestBase {
|
|||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 111,
|
||||
oldAttemptState.getFinishTime(), 0, 0);
|
||||
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
|
||||
store.updateApplicationAttemptState(dummyAttempt);
|
||||
|
||||
// let things settle down
|
||||
|
|
|
@ -415,7 +415,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
store.getCredentialsFromAppAttempt(mockAttempt),
|
||||
startTime, RMAppAttemptState.FINISHED, "testUrl",
|
||||
"test", FinalApplicationStatus.SUCCEEDED, 100,
|
||||
finishTime, 0, 0);
|
||||
finishTime, 0, 0, 0, 0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
|
|
@ -64,7 +64,8 @@ public class TestAppPage {
|
|||
when(app.getFinishTime()).thenReturn(0L);
|
||||
when(app.createApplicationState()).thenReturn(YarnApplicationState.FAILED);
|
||||
|
||||
RMAppMetrics appMetrics = new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
|
||||
RMAppMetrics appMetrics = new RMAppMetrics(
|
||||
Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
|
||||
when(app.getRMAppMetrics()).thenReturn(appMetrics);
|
||||
|
||||
// initialize RM Context, and create RMApp, without creating RMAppAttempt
|
||||
|
|
|
@ -142,7 +142,8 @@ public class TestRMWebAppFairScheduler {
|
|||
MockRMApp app = new MockRMApp(i, i, state) {
|
||||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0),
|
||||
0, 0, 0, 0, 0, 0);
|
||||
}
|
||||
@Override
|
||||
public YarnApplicationState createApplicationState() {
|
||||
|
|
|
@ -1325,7 +1325,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
|||
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
|
||||
Exception {
|
||||
|
||||
int expectedNumberOfElements = 32;
|
||||
int expectedNumberOfElements = 34;
|
||||
String appNodeLabelExpression = null;
|
||||
String amNodeLabelExpression = null;
|
||||
if (app.getApplicationSubmissionContext()
|
||||
|
|
Loading…
Reference in New Issue