YARN-415. Capture aggregate memory allocation at the app-level for chargeback. Contributed by Eric Payne & Andrey Klochkov

(cherry picked from commit 83be3ad444)
This commit is contained in:
Jian He 2014-09-10 18:19:53 -07:00
parent 5ca99b5297
commit ceae7be6b2
38 changed files with 1045 additions and 97 deletions

View File

@ -44,6 +44,9 @@ Release 2.6.0 - UNRELEASED
YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue. YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue.
(Wei Yan via kasha) (Wei Yan via kasha)
YARN-415. Capture aggregate memory allocation at the app-level for chargeback.
(Eric Payne & Andrey Klochkov via jianhe)
IMPROVEMENTS IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -35,7 +35,8 @@ public abstract class ApplicationResourceUsageReport {
@Unstable @Unstable
public static ApplicationResourceUsageReport newInstance( public static ApplicationResourceUsageReport newInstance(
int numUsedContainers, int numReservedContainers, Resource usedResources, int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources) { Resource reservedResources, Resource neededResources, long memorySeconds,
long vcoreSeconds) {
ApplicationResourceUsageReport report = ApplicationResourceUsageReport report =
Records.newRecord(ApplicationResourceUsageReport.class); Records.newRecord(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers); report.setNumUsedContainers(numUsedContainers);
@ -43,6 +44,8 @@ public abstract class ApplicationResourceUsageReport {
report.setUsedResources(usedResources); report.setUsedResources(usedResources);
report.setReservedResources(reservedResources); report.setReservedResources(reservedResources);
report.setNeededResources(neededResources); report.setNeededResources(neededResources);
report.setMemorySeconds(memorySeconds);
report.setVcoreSeconds(vcoreSeconds);
return report; return report;
} }
@ -113,4 +116,40 @@ public abstract class ApplicationResourceUsageReport {
@Private @Private
@Unstable @Unstable
public abstract void setNeededResources(Resource needed_resources); public abstract void setNeededResources(Resource needed_resources);
/**
* Set the aggregated amount of memory (in megabytes) the application has
* allocated times the number of seconds the application has been running.
* @param memory_seconds the aggregated amount of memory seconds
*/
@Private
@Unstable
public abstract void setMemorySeconds(long memory_seconds);
/**
* Get the aggregated amount of memory (in megabytes) the application has
* allocated times the number of seconds the application has been running.
* @return the aggregated amount of memory seconds
*/
@Public
@Unstable
public abstract long getMemorySeconds();
/**
* Set the aggregated number of vcores that the application has allocated
* times the number of seconds the application has been running.
* @param vcore_seconds the aggregated number of vcore seconds
*/
@Private
@Unstable
public abstract void setVcoreSeconds(long vcore_seconds);
/**
* Get the aggregated number of vcores that the application has allocated
* times the number of seconds the application has been running.
* @return the aggregated number of vcore seconds
*/
@Public
@Unstable
public abstract long getVcoreSeconds();
} }

View File

@ -167,6 +167,8 @@ message ApplicationResourceUsageReportProto {
optional ResourceProto used_resources = 3; optional ResourceProto used_resources = 3;
optional ResourceProto reserved_resources = 4; optional ResourceProto reserved_resources = 4;
optional ResourceProto needed_resources = 5; optional ResourceProto needed_resources = 5;
optional int64 memory_seconds = 6;
optional int64 vcore_seconds = 7;
} }
message ApplicationReportProto { message ApplicationReportProto {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@ -460,6 +461,11 @@ public class ApplicationCLI extends YarnCLI {
appReportStr.println(appReport.getRpcPort()); appReportStr.println(appReport.getRpcPort());
appReportStr.print("\tAM Host : "); appReportStr.print("\tAM Host : ");
appReportStr.println(appReport.getHost()); appReportStr.println(appReport.getHost());
appReportStr.print("\tAggregate Resource Allocation : ");
ApplicationResourceUsageReport usageReport = appReport.getApplicationResourceUsageReport();
appReportStr.print(usageReport.getMemorySeconds() + " MB-seconds, ");
appReportStr.println(usageReport.getVcoreSeconds() + " vcore-seconds");
appReportStr.print("\tDiagnostics : "); appReportStr.print("\tDiagnostics : ");
appReportStr.print(appReport.getDiagnostics()); appReportStr.print(appReport.getDiagnostics());
} else { } else {

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
@ -87,11 +88,15 @@ public class TestYarnCLI {
public void testGetApplicationReport() throws Exception { public void testGetApplicationReport() throws Exception {
ApplicationCLI cli = createAndGetAppCLI(); ApplicationCLI cli = createAndGetAppCLI();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5); ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
ApplicationResourceUsageReport usageReport =
ApplicationResourceUsageReport.newInstance(
2, 0, null, null, null, 123456, 4567);
ApplicationReport newApplicationReport = ApplicationReport.newInstance( ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, 1), applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null, "user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); FinalApplicationStatus.SUCCEEDED, usageReport, "N/A", 0.53789f, "YARN",
null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport); newApplicationReport);
int result = cli.run(new String[] { "application", "-status", applicationId.toString() }); int result = cli.run(new String[] { "application", "-status", applicationId.toString() });
@ -113,6 +118,7 @@ public class TestYarnCLI {
pw.println("\tTracking-URL : N/A"); pw.println("\tTracking-URL : N/A");
pw.println("\tRPC Port : 124"); pw.println("\tRPC Port : 124");
pw.println("\tAM Host : host"); pw.println("\tAM Host : host");
pw.println("\tAggregate Resource Allocation : 123456 MB-seconds, 4567 vcore-seconds");
pw.println("\tDiagnostics : diagnostics"); pw.println("\tDiagnostics : diagnostics");
pw.close(); pw.close();
String appReportStr = baos.toString("UTF-8"); String appReportStr = baos.toString("UTF-8");

View File

@ -200,6 +200,30 @@ extends ApplicationResourceUsageReport {
this.neededResources = reserved_resources; this.neededResources = reserved_resources;
} }
@Override
public synchronized void setMemorySeconds(long memory_seconds) {
maybeInitBuilder();
builder.setMemorySeconds(memory_seconds);
}
@Override
public synchronized long getMemorySeconds() {
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemorySeconds();
}
@Override
public synchronized void setVcoreSeconds(long vcore_seconds) {
maybeInitBuilder();
builder.setVcoreSeconds(vcore_seconds);
}
@Override
public synchronized long getVcoreSeconds() {
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getVcoreSeconds());
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p); return new ResourcePBImpl(p);
} }

View File

@ -370,7 +370,8 @@ public class BuilderUtils {
public static ApplicationResourceUsageReport newApplicationResourceUsageReport( public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
int numUsedContainers, int numReservedContainers, Resource usedResources, int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources) { Resource reservedResources, Resource neededResources, long memorySeconds,
long vcoreSeconds) {
ApplicationResourceUsageReport report = ApplicationResourceUsageReport report =
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class); recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers); report.setNumUsedContainers(numUsedContainers);
@ -378,6 +379,8 @@ public class BuilderUtils {
report.setUsedResources(usedResources); report.setUsedResources(usedResources);
report.setReservedResources(reservedResources); report.setReservedResources(reservedResources);
report.setNeededResources(neededResources); report.setNeededResources(neededResources);
report.setMemorySeconds(memorySeconds);
report.setVcoreSeconds(vcoreSeconds);
return report; return report;
} }

View File

@ -236,5 +236,5 @@ public class RMServerUtils {
DUMMY_APPLICATION_RESOURCE_USAGE_REPORT = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
BuilderUtils.newApplicationResourceUsageReport(-1, -1, BuilderUtils.newApplicationResourceUsageReport(-1, -1,
Resources.createResource(-1, -1), Resources.createResource(-1, -1), Resources.createResource(-1, -1), Resources.createResource(-1, -1),
Resources.createResource(-1, -1)); Resources.createResource(-1, -1), 0, 0);
} }

View File

@ -280,7 +280,9 @@ public class FileSystemRMStateStore extends RMStateStore {
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(), attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus()); attemptStateData.getAMContainerExitStatus(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
// assert child node name is same as application attempt id // assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId()); assert attemptId.equals(attemptState.getAttemptId());

View File

@ -138,7 +138,10 @@ public class MemoryRMStateStore extends RMStateStore {
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttemptId, new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials, attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime()); attemptStateData.getStartTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
ApplicationState appState = state.getApplicationState().get( ApplicationState appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId()); attemptState.getAttemptId().getApplicationId());
@ -167,7 +170,9 @@ public class MemoryRMStateStore extends RMStateStore {
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(), attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus()); attemptStateData.getAMContainerExitStatus(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
ApplicationState appState = ApplicationState appState =
state.getApplicationState().get( state.getApplicationState().get(

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -265,19 +266,21 @@ public abstract class RMStateStore extends AbstractService {
String diagnostics; String diagnostics;
int exitStatus = ContainerExitStatus.INVALID; int exitStatus = ContainerExitStatus.INVALID;
FinalApplicationStatus amUnregisteredFinalStatus; FinalApplicationStatus amUnregisteredFinalStatus;
long memorySeconds;
long vcoreSeconds;
public ApplicationAttemptState(ApplicationAttemptId attemptId, public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials, Container masterContainer, Credentials appAttemptCredentials,
long startTime) { long startTime, long memorySeconds, long vcoreSeconds) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null, this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
null, "", null, ContainerExitStatus.INVALID); null, "", null, ContainerExitStatus.INVALID, memorySeconds, vcoreSeconds);
} }
public ApplicationAttemptState(ApplicationAttemptId attemptId, public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials, Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl, long startTime, RMAppAttemptState state, String finalTrackingUrl,
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
int exitStatus) { int exitStatus, long memorySeconds, long vcoreSeconds) {
this.attemptId = attemptId; this.attemptId = attemptId;
this.masterContainer = masterContainer; this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials; this.appAttemptCredentials = appAttemptCredentials;
@ -287,6 +290,8 @@ public abstract class RMStateStore extends AbstractService {
this.diagnostics = diagnostics == null ? "" : diagnostics; this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
this.exitStatus = exitStatus; this.exitStatus = exitStatus;
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
} }
public Container getMasterContainer() { public Container getMasterContainer() {
@ -316,6 +321,12 @@ public abstract class RMStateStore extends AbstractService {
public int getAMContainerExitStatus(){ public int getAMContainerExitStatus(){
return this.exitStatus; return this.exitStatus;
} }
public long getMemorySeconds() {
return memorySeconds;
}
public long getVcoreSeconds() {
return vcoreSeconds;
}
} }
/** /**
@ -587,10 +598,13 @@ public abstract class RMStateStore extends AbstractService {
public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
Credentials credentials = getCredentialsFromAppAttempt(appAttempt); Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
AggregateAppResourceUsage resUsage =
appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(), new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials, appAttempt.getMasterContainer(), credentials,
appAttempt.getStartTime()); appAttempt.getStartTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState)); new RMStateStoreAppAttemptEvent(attemptState));
@ -746,7 +760,7 @@ public abstract class RMStateStore extends AbstractService {
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(), new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials, appAttempt.getMasterContainer(), credentials,
appAttempt.getStartTime()); appAttempt.getStartTime(), 0, 0);
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }

View File

@ -603,7 +603,9 @@ public class ZKRMStateStore extends RMStateStore {
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(), attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus()); attemptStateData.getAMContainerExitStatus(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }

View File

@ -43,7 +43,8 @@ public abstract class ApplicationAttemptStateData {
ApplicationAttemptId attemptId, Container container, ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics, String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long memorySeconds, long vcoreSeconds) {
ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class); Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId); attemptStateData.setAttemptId(attemptId);
@ -55,6 +56,8 @@ public abstract class ApplicationAttemptStateData {
attemptStateData.setStartTime(startTime); attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus); attemptStateData.setAMContainerExitStatus(exitStatus);
attemptStateData.setMemorySeconds(memorySeconds);
attemptStateData.setVcoreSeconds(vcoreSeconds);
return attemptStateData; return attemptStateData;
} }
@ -72,7 +75,8 @@ public abstract class ApplicationAttemptStateData {
attemptState.getStartTime(), attemptState.getState(), attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus(), attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus()); attemptState.getAMContainerExitStatus(),
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
} }
public abstract ApplicationAttemptStateDataProto getProto(); public abstract ApplicationAttemptStateDataProto getProto();
@ -157,4 +161,28 @@ public abstract class ApplicationAttemptStateData {
public abstract int getAMContainerExitStatus(); public abstract int getAMContainerExitStatus();
public abstract void setAMContainerExitStatus(int exitStatus); public abstract void setAMContainerExitStatus(int exitStatus);
/**
* Get the <em>memory seconds</em> (in MB seconds) of the application.
* @return <em>memory seconds</em> (in MB seconds) of the application
*/
@Public
@Unstable
public abstract long getMemorySeconds();
@Public
@Unstable
public abstract void setMemorySeconds(long memorySeconds);
/**
* Get the <em>vcore seconds</em> of the application.
* @return <em>vcore seconds</em> of the application
*/
@Public
@Unstable
public abstract long getVcoreSeconds();
@Public
@Unstable
public abstract void setVcoreSeconds(long vcoreSeconds);
} }

View File

@ -228,6 +228,30 @@ public class ApplicationAttemptStateDataPBImpl extends
builder.setStartTime(startTime); builder.setStartTime(startTime);
} }
@Override
public long getMemorySeconds() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemorySeconds();
}
@Override
public long getVcoreSeconds() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getVcoreSeconds();
}
@Override
public void setMemorySeconds(long memorySeconds) {
maybeInitBuilder();
builder.setMemorySeconds(memorySeconds);
}
@Override
public void setVcoreSeconds(long vcoreSeconds) {
maybeInitBuilder();
builder.setVcoreSeconds(vcoreSeconds);
}
@Override @Override
public FinalApplicationStatus getFinalApplicationStatus() { public FinalApplicationStatus getFinalApplicationStatus() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.Appli
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -561,6 +562,10 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
} }
} }
RMAppMetrics rmAppMetrics = getRMAppMetrics();
appUsageReport.setMemorySeconds(rmAppMetrics.getMemorySeconds());
appUsageReport.setVcoreSeconds(rmAppMetrics.getVcoreSeconds());
} }
if (currentApplicationAttemptId == null) { if (currentApplicationAttemptId == null) {
@ -1117,7 +1122,6 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override @Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM() if (!app.submissionContext.getUnmanagedAM()
&& app.getNumFailedAppAttempts() < app.maxAppAttempts) { && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false; boolean transferStateFromPreviousAttempt = false;
@ -1199,6 +1203,8 @@ public class RMAppImpl implements RMApp, Recoverable {
Resource resourcePreempted = Resource.newInstance(0, 0); Resource resourcePreempted = Resource.newInstance(0, 0);
int numAMContainerPreempted = 0; int numAMContainerPreempted = 0;
int numNonAMContainerPreempted = 0; int numNonAMContainerPreempted = 0;
long memorySeconds = 0;
long vcoreSeconds = 0;
for (RMAppAttempt attempt : attempts.values()) { for (RMAppAttempt attempt : attempts.values()) {
if (null != attempt) { if (null != attempt) {
RMAppAttemptMetrics attemptMetrics = RMAppAttemptMetrics attemptMetrics =
@ -1208,10 +1214,17 @@ public class RMAppImpl implements RMApp, Recoverable {
numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0; numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
numNonAMContainerPreempted += numNonAMContainerPreempted +=
attemptMetrics.getNumNonAMContainersPreempted(); attemptMetrics.getNumNonAMContainersPreempted();
// getAggregateAppResourceUsage() will calculate resource usage stats
// for both running and finished containers.
AggregateAppResourceUsage resUsage =
attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
memorySeconds += resUsage.getMemorySeconds();
vcoreSeconds += resUsage.getVcoreSeconds();
} }
} }
return new RMAppMetrics(resourcePreempted, return new RMAppMetrics(resourcePreempted,
numNonAMContainerPreempted, numAMContainerPreempted); numNonAMContainerPreempted, numAMContainerPreempted,
memorySeconds, vcoreSeconds);
} }
} }

View File

@ -24,12 +24,17 @@ public class RMAppMetrics {
final Resource resourcePreempted; final Resource resourcePreempted;
final int numNonAMContainersPreempted; final int numNonAMContainersPreempted;
final int numAMContainersPreempted; final int numAMContainersPreempted;
final long memorySeconds;
final long vcoreSeconds;
public RMAppMetrics(Resource resourcePreempted, public RMAppMetrics(Resource resourcePreempted,
int numNonAMContainersPreempted, int numAMContainersPreempted) { int numNonAMContainersPreempted, int numAMContainersPreempted,
long memorySeconds, long vcoreSeconds) {
this.resourcePreempted = resourcePreempted; this.resourcePreempted = resourcePreempted;
this.numNonAMContainersPreempted = numNonAMContainersPreempted; this.numNonAMContainersPreempted = numNonAMContainersPreempted;
this.numAMContainersPreempted = numAMContainersPreempted; this.numAMContainersPreempted = numAMContainersPreempted;
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
} }
public Resource getResourcePreempted() { public Resource getResourcePreempted() {
@ -43,4 +48,12 @@ public class RMAppMetrics {
public int getNumAMContainersPreempted() { public int getNumAMContainersPreempted() {
return numAMContainersPreempted; return numAMContainersPreempted;
} }
public long getMemorySeconds() {
return memorySeconds;
}
public long getVcoreSeconds() {
return vcoreSeconds;
}
} }

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@Private
public class AggregateAppResourceUsage {
long memorySeconds;
long vcoreSeconds;
public AggregateAppResourceUsage(long memorySeconds, long vcoreSeconds) {
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
}
/**
* @return the memorySeconds
*/
public long getMemorySeconds() {
return memorySeconds;
}
/**
* @param memorySeconds the memorySeconds to set
*/
public void setMemorySeconds(long memorySeconds) {
this.memorySeconds = memorySeconds;
}
/**
* @return the vcoreSeconds
*/
public long getVcoreSeconds() {
return vcoreSeconds;
}
/**
* @param vcoreSeconds the vcoreSeconds to set
*/
public void setVcoreSeconds(long vcoreSeconds) {
this.vcoreSeconds = vcoreSeconds;
}
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@ -430,7 +431,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.proxiedTrackingUrl = generateProxyUriWithScheme(null); this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.maybeLastAttempt = maybeLastAttempt; this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this); this.stateMachine = stateMachineFactory.make(this);
this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId); this.attemptMetrics =
new RMAppAttemptMetrics(applicationAttemptId, rmContext);
} }
@Override @Override
@ -704,6 +706,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
if (report == null) { if (report == null) {
report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
} }
AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage();
report.setMemorySeconds(resUsage.getMemorySeconds());
report.setVcoreSeconds(resUsage.getVcoreSeconds());
return report; return report;
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
@ -733,6 +739,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus(); this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime(); this.startTime = attemptState.getStartTime();
this.attemptMetrics.updateAggregateAppResourceUsage(
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
} }
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
@ -1017,12 +1025,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
default: default:
break; break;
} }
AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore(); RMStateStore rmStore = rmContext.getStateStore();
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime, rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: " + " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus); + exitStatus);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -27,7 +28,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -42,12 +45,17 @@ public class RMAppAttemptMetrics {
private ReadLock readLock; private ReadLock readLock;
private WriteLock writeLock; private WriteLock writeLock;
private AtomicLong finishedMemorySeconds = new AtomicLong(0);
private AtomicLong finishedVcoreSeconds = new AtomicLong(0);
private RMContext rmContext;
public RMAppAttemptMetrics(ApplicationAttemptId attemptId) { public RMAppAttemptMetrics(ApplicationAttemptId attemptId,
RMContext rmContext) {
this.attemptId = attemptId; this.attemptId = attemptId;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock(); this.readLock = lock.readLock();
this.writeLock = lock.writeLock(); this.writeLock = lock.writeLock();
this.rmContext = rmContext;
} }
public void updatePreemptionInfo(Resource resource, RMContainer container) { public void updatePreemptionInfo(Resource resource, RMContainer container) {
@ -94,4 +102,28 @@ public class RMAppAttemptMetrics {
public boolean getIsPreempted() { public boolean getIsPreempted() {
return this.isPreempted.get(); return this.isPreempted.get();
} }
public AggregateAppResourceUsage getAggregateAppResourceUsage() {
long memorySeconds = finishedMemorySeconds.get();
long vcoreSeconds = finishedVcoreSeconds.get();
// Only add in the running containers if this is the active attempt.
RMAppAttempt currentAttempt = rmContext.getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
if (currentAttempt.getAppAttemptId().equals(attemptId)) {
ApplicationResourceUsageReport appResUsageReport = rmContext
.getScheduler().getAppResourceUsageReport(attemptId);
if (appResUsageReport != null) {
memorySeconds += appResUsageReport.getMemorySeconds();
vcoreSeconds += appResUsageReport.getVcoreSeconds();
}
}
return new AggregateAppResourceUsage(memorySeconds, vcoreSeconds);
}
public void updateAggregateAppResourceUsage(long finishedMemorySeconds,
long finishedVcoreSeconds) {
this.finishedMemorySeconds.addAndGet(finishedMemorySeconds);
this.finishedVcoreSeconds.addAndGet(finishedVcoreSeconds);
}
} }

View File

@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@ -488,7 +490,7 @@ public class RMContainerImpl implements RMContainer {
// Inform AppAttempt // Inform AppAttempt
// container.getContainer() can return null when a RMContainer is a // container.getContainer() can return null when a RMContainer is a
// reserved container // reserved container
updateMetricsIfPreempted(container); updateAttemptMetrics(container);
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus())); container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
@ -497,19 +499,27 @@ public class RMContainerImpl implements RMContainer {
container); container);
} }
private static void updateMetricsIfPreempted(RMContainerImpl container) { private static void updateAttemptMetrics(RMContainerImpl container) {
// If this is a preempted container, update preemption metrics // If this is a preempted container, update preemption metrics
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
.getExitStatus()) {
Resource resource = container.getContainer().getResource(); Resource resource = container.getContainer().getResource();
RMAppAttempt rmAttempt = RMAppAttempt rmAttempt = container.rmContext.getRMApps()
container.rmContext.getRMApps()
.get(container.getApplicationAttemptId().getApplicationId()) .get(container.getApplicationAttemptId().getApplicationId())
.getCurrentAppAttempt(); .getCurrentAppAttempt();
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
.getExitStatus()) {
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource, rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
container); container);
} }
if (rmAttempt != null) {
long usedMillis = container.finishTime - container.creationTime;
long memorySeconds = resource.getMemory()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
rmAttempt.getRMAppAttemptMetrics()
.updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds);
}
} }
} }

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
@ -69,6 +71,11 @@ public class SchedulerApplicationAttempt {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(SchedulerApplicationAttempt.class); .getLog(SchedulerApplicationAttempt.class);
private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
protected long lastMemoryAggregateAllocationUpdateTime = 0;
private long lastMemorySeconds = 0;
private long lastVcoreSeconds = 0;
protected final AppSchedulingInfo appSchedulingInfo; protected final AppSchedulingInfo appSchedulingInfo;
protected Map<ContainerId, RMContainer> liveContainers = protected Map<ContainerId, RMContainer> liveContainers =
@ -506,11 +513,37 @@ public class SchedulerApplicationAttempt {
schedulingOpportunities.setCount(priority, 0); schedulingOpportunities.setCount(priority, 0);
} }
synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
long currentTimeMillis = System.currentTimeMillis();
// Don't walk the whole container list if the resources were computed
// recently.
if ((currentTimeMillis - lastMemoryAggregateAllocationUpdateTime)
> MEM_AGGREGATE_ALLOCATION_CACHE_MSECS) {
long memorySeconds = 0;
long vcoreSeconds = 0;
for (RMContainer rmContainer : this.liveContainers.values()) {
long usedMillis = currentTimeMillis - rmContainer.getCreationTime();
Resource resource = rmContainer.getContainer().getResource();
memorySeconds += resource.getMemory() * usedMillis /
DateUtils.MILLIS_PER_SECOND;
vcoreSeconds += resource.getVirtualCores() * usedMillis
/ DateUtils.MILLIS_PER_SECOND;
}
lastMemoryAggregateAllocationUpdateTime = currentTimeMillis;
lastMemorySeconds = memorySeconds;
lastVcoreSeconds = vcoreSeconds;
}
return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds);
}
public synchronized ApplicationResourceUsageReport getResourceUsageReport() { public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
return ApplicationResourceUsageReport.newInstance(liveContainers.size(), return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
reservedContainers.size(), Resources.clone(currentConsumption), reservedContainers.size(), Resources.clone(currentConsumption),
Resources.clone(currentReservation), Resources.clone(currentReservation),
Resources.add(currentConsumption, currentReservation)); Resources.add(currentConsumption, currentReservation),
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
} }
public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() { public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {

View File

@ -106,6 +106,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
queue.getMetrics().releaseResources(getUser(), 1, containerResource); queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource); Resources.subtractFrom(currentConsumption, containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
return true; return true;
} }

View File

@ -146,6 +146,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// remove from preemption map if it is completed // remove from preemption map if it is completed
preemptionMap.remove(rmContainer); preemptionMap.remove(rmContainer);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
} }
private synchronized void unreserveInternal( private synchronized void unreserveInternal(

View File

@ -146,7 +146,10 @@ public class AppBlock extends HtmlBlock {
attemptMetrics.getResourcePreempted()) attemptMetrics.getResourcePreempted())
._("Number of Non-AM Containers Preempted from Current Attempt:", ._("Number of Non-AM Containers Preempted from Current Attempt:",
String.valueOf(attemptMetrics String.valueOf(attemptMetrics
.getNumNonAMContainersPreempted())); .getNumNonAMContainersPreempted()))
._("Aggregate Resource Allocation:",
String.format("%d MB-seconds, %d vcore-seconds",
appMerics.getMemorySeconds(), appMerics.getVcoreSeconds()));
pdiv._(); pdiv._();
Collection<RMAppAttempt> attempts = rmApp.getAppAttempts().values(); Collection<RMAppAttempt> attempts = rmApp.getAppAttempts().values();

View File

@ -79,6 +79,8 @@ public class AppInfo {
protected int allocatedMB; protected int allocatedMB;
protected int allocatedVCores; protected int allocatedVCores;
protected int runningContainers; protected int runningContainers;
protected long memorySeconds;
protected long vcoreSeconds;
// preemption info fields // preemption info fields
protected int preemptedResourceMB; protected int preemptedResourceMB;
@ -165,6 +167,8 @@ public class AppInfo {
appMetrics.getNumNonAMContainersPreempted(); appMetrics.getNumNonAMContainersPreempted();
preemptedResourceVCores = preemptedResourceVCores =
appMetrics.getResourcePreempted().getVirtualCores(); appMetrics.getResourcePreempted().getVirtualCores();
memorySeconds = appMetrics.getMemorySeconds();
vcoreSeconds = appMetrics.getVcoreSeconds();
} }
} }
@ -287,4 +291,12 @@ public class AppInfo {
public int getNumAMContainersPreempted() { public int getNumAMContainersPreempted() {
return numAMContainerPreempted; return numAMContainerPreempted;
} }
public long getMemorySeconds() {
return memorySeconds;
}
public long getVcoreSeconds() {
return vcoreSeconds;
}
} }

View File

@ -78,6 +78,8 @@ message ApplicationAttemptStateDataProto {
optional int64 start_time = 7; optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8; optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000]; optional int32 am_container_exit_status = 9 [default = -1000];
optional int64 memory_seconds = 10;
optional int64 vcore_seconds = 11;
} }
message EpochProto { message EpochProto {

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRes
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -223,7 +225,7 @@ public class TestClientRMService {
} }
@Test @Test
public void testGetApplicationReport() throws YarnException { public void testNonExistingApplicationReport() throws YarnException {
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getRMApps()).thenReturn( when(rmContext.getRMApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, RMApp>()); new ConcurrentHashMap<ApplicationId, RMApp>());
@ -243,6 +245,38 @@ public class TestClientRMService {
} }
} }
@Test
public void testGetApplicationReport() throws Exception {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
ApplicationId appId1 = getApplicationId(1);
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
when(
mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler,
null, mockAclsManager, null, null);
try {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId1);
GetApplicationReportResponse response =
rmService.getApplicationReport(request);
ApplicationReport report = response.getApplicationReport();
ApplicationResourceUsageReport usageReport =
report.getApplicationResourceUsageReport();
Assert.assertEquals(10, usageReport.getMemorySeconds());
Assert.assertEquals(3, usageReport.getVcoreSeconds());
} finally {
rmService.close();
}
}
@Test @Test
public void testGetApplicationAttemptReport() throws YarnException, public void testGetApplicationAttemptReport() throws YarnException,
IOException { IOException {
@ -1065,11 +1099,11 @@ public class TestClientRMService {
ApplicationId applicationId3 = getApplicationId(3); ApplicationId applicationId3 = getApplicationId(3);
YarnConfiguration config = new YarnConfiguration(); YarnConfiguration config = new YarnConfiguration();
apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1, apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1,
config, "testqueue")); config, "testqueue", 10, 3));
apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2, apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2,
config, "a")); config, "a", 20, 2));
apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3, apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3,
config, "testqueue")); config, "testqueue", 40, 5));
return apps; return apps;
} }
@ -1091,12 +1125,26 @@ public class TestClientRMService {
} }
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName) { ApplicationId applicationId3, YarnConfiguration config, String queueName,
final long memorySeconds, final long vcoreSeconds) {
ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class);
when(asContext.getMaxAppAttempts()).thenReturn(1); when(asContext.getMaxAppAttempts()).thenReturn(1);
RMAppImpl app = spy(new RMAppImpl(applicationId3, rmContext, config, null, RMAppImpl app = spy(new RMAppImpl(applicationId3, rmContext, config, null,
null, queueName, asContext, yarnScheduler, null, null, queueName, asContext, yarnScheduler, null,
System.currentTimeMillis(), "YARN", null)); System.currentTimeMillis(), "YARN", null) {
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
ApplicationReport report = super.createAndGetApplicationReport(
clientUserName, allowAccess);
ApplicationResourceUsageReport usageReport =
report.getApplicationResourceUsageReport();
usageReport.setMemorySeconds(memorySeconds);
usageReport.setVcoreSeconds(vcoreSeconds);
report.setApplicationResourceUsageReport(usageReport);
return report;
}
});
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456, 1), 1); ApplicationId.newInstance(123456, 1), 1);
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId, RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,

View File

@ -0,0 +1,401 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestContainerResourceUsage {
private YarnConfiguration conf;
@Before
public void setup() throws UnknownHostException {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
}
@After
public void tearDown() {
}
@Test (timeout = 60000)
public void testUsageWithOneAttemptAndOneContainer() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm.registerNode();
RMApp app0 = rm.submitApp(200);
RMAppMetrics rmAppMetrics = app0.getRMAppMetrics();
Assert.assertTrue(
"Before app submittion, memory seconds should have been 0 but was "
+ rmAppMetrics.getMemorySeconds(),
rmAppMetrics.getMemorySeconds() == 0);
Assert.assertTrue(
"Before app submission, vcore seconds should have been 0 but was "
+ rmAppMetrics.getVcoreSeconds(),
rmAppMetrics.getVcoreSeconds() == 0);
RMAppAttempt attempt0 = app0.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am0 = rm.sendAMLaunched(attempt0.getAppAttemptId());
am0.registerAppAttempt();
RMContainer rmContainer =
rm.getResourceScheduler()
.getRMContainer(attempt0.getMasterContainer().getId());
// Allow metrics to accumulate.
Thread.sleep(1000);
rmAppMetrics = app0.getRMAppMetrics();
Assert.assertTrue(
"While app is running, memory seconds should be >0 but is "
+ rmAppMetrics.getMemorySeconds(),
rmAppMetrics.getMemorySeconds() > 0);
Assert.assertTrue(
"While app is running, vcore seconds should be >0 but is "
+ rmAppMetrics.getVcoreSeconds(),
rmAppMetrics.getVcoreSeconds() > 0);
MockRM.finishAMAndVerifyAppState(app0, rm, nm, am0);
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(rmContainer);
rmAppMetrics = app0.getRMAppMetrics();
Assert.assertEquals("Unexcpected MemorySeconds value",
ru.getMemorySeconds(), rmAppMetrics.getMemorySeconds());
Assert.assertEquals("Unexpected VcoreSeconds value",
ru.getVcoreSeconds(), rmAppMetrics.getVcoreSeconds());
rm.stop();
}
@Test (timeout = 60000)
public void testUsageWithMultipleContainersAndRMRestart() throws Exception {
// Set max attempts to 1 so that when the first attempt fails, the app
// won't try to start a new one.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm0 = new MockRM(conf, memStore);
rm0.start();
MockNM nm =
new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService());
nm.registerNode();
RMApp app0 = rm0.submitApp(200);
rm0.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt0 = app0.getCurrentAppAttempt();
ApplicationAttemptId attemptId0 = attempt0.getAppAttemptId();
rm0.waitForState(attemptId0, RMAppAttemptState.SCHEDULED);
nm.nodeHeartbeat(true);
rm0.waitForState(attemptId0, RMAppAttemptState.ALLOCATED);
MockAM am0 = rm0.sendAMLaunched(attempt0.getAppAttemptId());
am0.registerAppAttempt();
int NUM_CONTAINERS = 2;
am0.allocate("127.0.0.1" , 1000, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() != NUM_CONTAINERS) {
nm.nodeHeartbeat(true);
conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
// launch the 2nd and 3rd containers.
for (Container c : conts) {
nm.nodeHeartbeat(attempt0.getAppAttemptId(),
c.getId().getId(), ContainerState.RUNNING);
rm0.waitForState(nm, c.getId(), RMContainerState.RUNNING);
}
// Get the RMContainers for all of the live containers, to be used later
// for metrics calculations and comparisons.
Collection<RMContainer> rmContainers =
rm0.scheduler
.getSchedulerAppInfo(attempt0.getAppAttemptId())
.getLiveContainers();
// Give the metrics time to accumulate.
Thread.sleep(1000);
// Stop all non-AM containers
for (Container c : conts) {
if (c.getId().getId() == 1) continue;
nm.nodeHeartbeat(attempt0.getAppAttemptId(),
c.getId().getId(), ContainerState.COMPLETE);
rm0.waitForState(nm, c.getId(), RMContainerState.COMPLETED);
}
// After all other containers have completed, manually complete the master
// container in order to trigger a save to the state store of the resource
// usage metrics. This will cause the attempt to fail, and, since the max
// attempt retries is 1, the app will also fail. This is intentional so
// that all containers will complete prior to saving.
ContainerId cId = ContainerId.newInstance(attempt0.getAppAttemptId(), 1);
nm.nodeHeartbeat(attempt0.getAppAttemptId(),
cId.getId(), ContainerState.COMPLETE);
rm0.waitForState(nm, cId, RMContainerState.COMPLETED);
// Check that the container metrics match those from the app usage report.
long memorySeconds = 0;
long vcoreSeconds = 0;
for (RMContainer c : rmContainers) {
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
memorySeconds += ru.getMemorySeconds();
vcoreSeconds += ru.getVcoreSeconds();
}
RMAppMetrics metricsBefore = app0.getRMAppMetrics();
Assert.assertEquals("Unexcpected MemorySeconds value",
memorySeconds, metricsBefore.getMemorySeconds());
Assert.assertEquals("Unexpected VcoreSeconds value",
vcoreSeconds, metricsBefore.getVcoreSeconds());
// create new RM to represent RM restart. Load up the state store.
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
RMApp app0After =
rm1.getRMContext().getRMApps().get(app0.getApplicationId());
// Compare container resource usage metrics from before and after restart.
RMAppMetrics metricsAfter = app0After.getRMAppMetrics();
Assert.assertEquals("Vcore seconds were not the same after RM Restart",
metricsBefore.getVcoreSeconds(), metricsAfter.getVcoreSeconds());
Assert.assertEquals("Memory seconds were not the same after RM Restart",
metricsBefore.getMemorySeconds(), metricsAfter.getMemorySeconds());
rm0.stop();
rm0.close();
rm1.stop();
rm1.close();
}
@Test(timeout = 60000)
public void testUsageAfterAMRestartWithMultipleContainers() throws Exception {
amRestartTests(false);
}
@Test(timeout = 60000)
public void testUsageAfterAMRestartKeepContainers() throws Exception {
amRestartTests(true);
}
private void amRestartTests(boolean keepRunningContainers)
throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
RMApp app =
rm.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null, "MAPREDUCE", false, keepRunningContainers);
MockNM nm =
new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService());
nm.registerNode();
MockAM am0 = MockRM.launchAndRegisterAM(app, rm, nm);
int NUM_CONTAINERS = 1;
// allocate NUM_CONTAINERS containers
am0.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm.nodeHeartbeat(true);
containers.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch the 2nd container.
ContainerId containerId2 =
ContainerId.newInstance(am0.getApplicationAttemptId(), 2);
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
containerId2.getId(), ContainerState.RUNNING);
rm.waitForState(nm, containerId2, RMContainerState.RUNNING);
// Capture the containers here so the metrics can be calculated after the
// app has completed.
Collection<RMContainer> rmContainers =
rm.scheduler
.getSchedulerAppInfo(am0.getApplicationAttemptId())
.getLiveContainers();
// fail the first app attempt by sending CONTAINER_FINISHED event without
// registering.
ContainerId amContainerId =
app.getCurrentAppAttempt().getMasterContainer().getId();
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
amContainerId.getId(), ContainerState.COMPLETE);
am0.waitForState(RMAppAttemptState.FAILED);
long memorySeconds = 0;
long vcoreSeconds = 0;
// Calculate container usage metrics for first attempt.
if (keepRunningContainers) {
// Only calculate the usage for the one container that has completed.
for (RMContainer c : rmContainers) {
if (c.getContainerId().equals(amContainerId)) {
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
memorySeconds += ru.getMemorySeconds();
vcoreSeconds += ru.getVcoreSeconds();
} else {
// The remaining container should be RUNNING.
Assert.assertTrue("After first attempt failed, remaining container "
+ "should still be running. ",
c.getContainerState().equals(ContainerState.RUNNING));
}
}
} else {
// If keepRunningContainers is false, all live containers should now
// be completed. Calculate the resource usage metrics for all of them.
for (RMContainer c : rmContainers) {
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
memorySeconds += ru.getMemorySeconds();
vcoreSeconds += ru.getVcoreSeconds();
}
}
// wait for app to start a new attempt.
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
// assert this is a new AM.
RMAppAttempt attempt2 = app.getCurrentAppAttempt();
Assert.assertFalse(attempt2.getAppAttemptId()
.equals(am0.getApplicationAttemptId()));
// launch the new AM
nm.nodeHeartbeat(true);
MockAM am1 = rm.sendAMLaunched(attempt2.getAppAttemptId());
am1.registerAppAttempt();
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
// wait for containers to be allocated.
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
// Capture running containers for later use by metrics calculations.
rmContainers = rm.scheduler.getSchedulerAppInfo(attempt2.getAppAttemptId())
.getLiveContainers();
// complete container by sending the container complete event which has
// earlier attempt's attemptId
amContainerId = app.getCurrentAppAttempt().getMasterContainer().getId();
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
amContainerId.getId(), ContainerState.COMPLETE);
MockRM.finishAMAndVerifyAppState(app, rm, nm, am1);
// Calculate container usage metrics for second attempt.
for (RMContainer c : rmContainers) {
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
memorySeconds += ru.getMemorySeconds();
vcoreSeconds += ru.getVcoreSeconds();
}
RMAppMetrics rmAppMetrics = app.getRMAppMetrics();
Assert.assertEquals("Unexcpected MemorySeconds value",
memorySeconds, rmAppMetrics.getMemorySeconds());
Assert.assertEquals("Unexpected VcoreSeconds value",
vcoreSeconds, rmAppMetrics.getVcoreSeconds());
rm.stop();
return;
}
private AggregateAppResourceUsage calculateContainerResourceMetrics(
RMContainer rmContainer) {
Resource resource = rmContainer.getContainer().getResource();
long usedMillis =
rmContainer.getFinishTime() - rmContainer.getCreationTime();
long memorySeconds = resource.getMemory()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
return new AggregateAppResourceUsage(memorySeconds, vcoreSeconds);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -170,7 +171,7 @@ public abstract class MockAsm extends MockApps {
@Override @Override
public RMAppMetrics getRMAppMetrics() { public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0); return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
} }
} }
@ -259,6 +260,22 @@ public abstract class MockAsm extends MockApps {
public Set<String> getApplicationTags() { public Set<String> getApplicationTags() {
return null; return null;
} }
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
ApplicationResourceUsageReport usageReport =
ApplicationResourceUsageReport.newInstance(0, 0, null, null, null,
0, 0);
ApplicationReport report = ApplicationReport.newInstance(
getApplicationId(), appAttemptId, getUser(), getQueue(),
getName(), null, 0, null, null, getDiagnostics().toString(),
getTrackingUrl(), getStartTime(), getFinishTime(),
getFinalApplicationStatus(), usageReport , null, getProgress(),
type, null);
return report;
}
}; };
} }

View File

@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMSta
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@ -152,6 +154,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
SecretKey clientTokenMasterKey, TestDispatcher dispatcher) SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
throws Exception { throws Exception {
RMAppAttemptMetrics mockRmAppAttemptMetrics =
mock(RMAppAttemptMetrics.class);
Container container = new ContainerPBImpl(); Container container = new ContainerPBImpl();
container.setId(ConverterUtils.toContainerId(containerIdStr)); container.setId(ConverterUtils.toContainerId(containerIdStr));
RMAppAttempt mockAttempt = mock(RMAppAttempt.class); RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
@ -160,6 +164,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
when(mockAttempt.getAMRMToken()).thenReturn(appToken); when(mockAttempt.getAMRMToken()).thenReturn(appToken);
when(mockAttempt.getClientTokenMasterKey()) when(mockAttempt.getClientTokenMasterKey())
.thenReturn(clientTokenMasterKey); .thenReturn(clientTokenMasterKey);
when(mockAttempt.getRMAppAttemptMetrics())
.thenReturn(mockRmAppAttemptMetrics);
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
.thenReturn(new AggregateAppResourceUsage(0,0));
dispatcher.attemptId = attemptId; dispatcher.attemptId = attemptId;
store.storeNewApplicationAttempt(mockAttempt); store.storeNewApplicationAttempt(mockAttempt);
waitNotify(dispatcher); waitNotify(dispatcher);
@ -224,6 +232,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
"container_1352994193343_0002_01_000001", null, null, dispatcher); "container_1352994193343_0002_01_000001", null, null, dispatcher);
RMApp mockRemovedApp = mock(RMApp.class); RMApp mockRemovedApp = mock(RMApp.class);
RMAppAttemptMetrics mockRmAppAttemptMetrics =
mock(RMAppAttemptMetrics.class);
HashMap<ApplicationAttemptId, RMAppAttempt> attempts = HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
new HashMap<ApplicationAttemptId, RMAppAttempt>(); new HashMap<ApplicationAttemptId, RMAppAttempt>();
ApplicationSubmissionContext context = ApplicationSubmissionContext context =
@ -234,6 +244,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
when(mockRemovedApp.getAppAttempts()).thenReturn(attempts); when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class); RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved); when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
when(mockRemovedAttempt.getRMAppAttemptMetrics())
.thenReturn(mockRmAppAttemptMetrics);
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
.thenReturn(new AggregateAppResourceUsage(0,0));
attempts.put(attemptIdRemoved, mockRemovedAttempt); attempts.put(attemptIdRemoved, mockRemovedAttempt);
store.removeApplication(mockRemovedApp); store.removeApplication(mockRemovedApp);
@ -304,7 +318,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100); FinalApplicationStatus.SUCCEEDED, 100, 0, 0);
store.updateApplicationAttemptState(newAttemptState); store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not // test updating the state of an app/attempt whose initial state was not
@ -327,7 +341,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111); FinalApplicationStatus.SUCCEEDED, 111, 0, 0);
store.updateApplicationAttemptState(dummyAttempt); store.updateApplicationAttemptState(dummyAttempt);
// let things settle down // let things settle down

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp; package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -38,6 +40,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -61,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@ -74,6 +78,7 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
@ -189,7 +194,7 @@ public class TestRMAppTransitions {
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class); store = mock(RMStateStore.class);
writer = mock(RMApplicationHistoryWriter.class); writer = mock(RMApplicationHistoryWriter.class);
this.rmContext = RMContext realRMContext =
new RMContextImpl(rmDispatcher, new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf, this.rmContext), null, new AMRMTokenSecretManager(conf, this.rmContext),
@ -197,7 +202,14 @@ public class TestRMAppTransitions {
new NMTokenSecretManagerInRM(conf), new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), new ClientToAMTokenSecretManagerInRM(),
writer); writer);
((RMContextImpl)rmContext).setStateStore(store); ((RMContextImpl)realRMContext).setStateStore(store);
this.rmContext = spy(realRMContext);
ResourceScheduler resourceScheduler = mock(ResourceScheduler.class);
doReturn(null).when(resourceScheduler)
.getAppResourceUsageReport((ApplicationAttemptId)Matchers.any());
doReturn(resourceScheduler).when(rmContext).getScheduler();
rmDispatcher.register(RMAppAttemptEventType.class, rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher(this.rmContext)); new TestApplicationAttemptEventDispatcher(this.rmContext));

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -49,6 +50,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -87,6 +89,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -107,6 +111,8 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class TestRMAppAttemptTransitions { public class TestRMAppAttemptTransitions {
@ -120,7 +126,9 @@ public class TestRMAppAttemptTransitions {
private boolean isSecurityEnabled; private boolean isSecurityEnabled;
private RMContext rmContext; private RMContext rmContext;
private RMContext spyRMContext;
private YarnScheduler scheduler; private YarnScheduler scheduler;
private ResourceScheduler resourceScheduler;
private ApplicationMasterService masterService; private ApplicationMasterService masterService;
private ApplicationMasterLauncher applicationMasterLauncher; private ApplicationMasterLauncher applicationMasterLauncher;
private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amLivelinessMonitor;
@ -263,6 +271,19 @@ public class TestRMAppAttemptTransitions {
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 0); ApplicationAttemptId.newInstance(applicationId, 0);
resourceScheduler = mock(ResourceScheduler.class);
ApplicationResourceUsageReport appResUsgRpt =
mock(ApplicationResourceUsageReport.class);
when(appResUsgRpt.getMemorySeconds()).thenReturn(0L);
when(appResUsgRpt.getVcoreSeconds()).thenReturn(0L);
when(resourceScheduler
.getAppResourceUsageReport((ApplicationAttemptId)Matchers.any()))
.thenReturn(appResUsgRpt);
spyRMContext = spy(rmContext);
Mockito.doReturn(resourceScheduler).when(spyRMContext).getScheduler();
final String user = MockApps.newUserName(); final String user = MockApps.newUserName();
final String queue = MockApps.newQueue(); final String queue = MockApps.newQueue();
submissionContext = mock(ApplicationSubmissionContext.class); submissionContext = mock(ApplicationSubmissionContext.class);
@ -278,17 +299,18 @@ public class TestRMAppAttemptTransitions {
application = mock(RMAppImpl.class); application = mock(RMAppImpl.class);
applicationAttempt = applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler, new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler,
masterService, submissionContext, new Configuration(), false); masterService, submissionContext, new Configuration(), false);
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt); when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId); when(application.getApplicationId()).thenReturn(applicationId);
spyRMContext.getRMApps().put(application.getApplicationId(), application);
testAppAttemptNewState(); testAppAttemptNewState();
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
((AsyncDispatcher)this.rmContext.getDispatcher()).stop(); ((AsyncDispatcher)this.spyRMContext.getDispatcher()).stop();
} }
@ -698,6 +720,46 @@ public class TestRMAppAttemptTransitions {
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
} }
@Test
public void testUsageReport() {
// scheduler has info on running apps
ApplicationAttemptId attemptId = applicationAttempt.getAppAttemptId();
ApplicationResourceUsageReport appResUsgRpt =
mock(ApplicationResourceUsageReport.class);
when(appResUsgRpt.getMemorySeconds()).thenReturn(123456L);
when(appResUsgRpt.getVcoreSeconds()).thenReturn(55544L);
when(scheduler.getAppResourceUsageReport(any(ApplicationAttemptId.class)))
.thenReturn(appResUsgRpt);
// start and finish the attempt
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(attemptId,
"", FinalApplicationStatus.SUCCEEDED, ""));
// expect usage stats to come from the scheduler report
ApplicationResourceUsageReport report =
applicationAttempt.getApplicationResourceUsageReport();
Assert.assertEquals(123456L, report.getMemorySeconds());
Assert.assertEquals(55544L, report.getVcoreSeconds());
// finish app attempt and remove it from scheduler
when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L);
when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L);
sendAttemptUpdateSavedEvent(applicationAttempt);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
attemptId,
ContainerStatus.newInstance(
amContainer.getId(), ContainerState.COMPLETE, "", 0)));
when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null);
report = applicationAttempt.getApplicationResourceUsageReport();
Assert.assertEquals(223456, report.getMemorySeconds());
Assert.assertEquals(75544, report.getVcoreSeconds());
}
@Test @Test
public void testUnmanagedAMUnexpectedRegistration() { public void testUnmanagedAMUnexpectedRegistration() {
unmanagedAM = true; unmanagedAM = true;
@ -1243,7 +1305,7 @@ public class TestRMAppAttemptTransitions {
public void testContainersCleanupForLastAttempt() { public void testContainersCleanupForLastAttempt() {
// create a failed attempt. // create a failed attempt.
applicationAttempt = applicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), rmContext, new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
scheduler, masterService, submissionContext, new Configuration(), scheduler, masterService, submissionContext, new Configuration(),
true); true);
when(submissionContext.getKeepContainersAcrossApplicationAttempts()) when(submissionContext.getKeepContainersAcrossApplicationAttempts())

View File

@ -23,10 +23,13 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -59,6 +62,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public class TestRMContainerImpl { public class TestRMContainerImpl {
@ -86,12 +91,18 @@ public class TestRMContainerImpl {
Container container = BuilderUtils.newContainer(containerId, nodeId, Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null); "host:3465", resource, priority, null);
ConcurrentMap<ApplicationId, RMApp> rmApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null);
Mockito.doReturn(rmApp).when(rmApps).get((ApplicationId)Matchers.any());
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher); when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getRMApps()).thenReturn(rmApps);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext); nodeId, "user", rmContext);

View File

@ -39,12 +39,15 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -59,6 +62,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -79,6 +83,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -90,6 +96,7 @@ public class TestLeafQueue {
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
RMContext rmContext; RMContext rmContext;
RMContext spyRMContext;
CapacityScheduler cs; CapacityScheduler cs;
CapacitySchedulerConfiguration csConf; CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext; CapacitySchedulerContext csContext;
@ -107,6 +114,14 @@ public class TestLeafQueue {
CapacityScheduler spyCs = new CapacityScheduler(); CapacityScheduler spyCs = new CapacityScheduler();
cs = spy(spyCs); cs = spy(spyCs);
rmContext = TestUtils.getMockRMContext(); rmContext = TestUtils.getMockRMContext();
spyRMContext = spy(rmContext);
ConcurrentMap<ApplicationId, RMApp> spyApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
csConf = csConf =
new CapacitySchedulerConfiguration(); new CapacitySchedulerConfiguration();
@ -143,7 +158,7 @@ public class TestLeafQueue {
queues, queues, queues, queues,
TestUtils.spyHook); TestUtils.spyHook);
cs.setRMContext(rmContext); cs.setRMContext(spyRMContext);
cs.init(csConf); cs.init(csConf);
cs.start(); cs.start();
} }
@ -280,14 +295,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
@ -329,14 +344,14 @@ public class TestLeafQueue {
final ApplicationAttemptId appAttemptId_0 = TestUtils final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1); .getMockApplicationAttemptId(0, 1);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null, FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null,
rmContext); spyRMContext);
d.submitApplicationAttempt(app_0, user_d); d.submitApplicationAttempt(app_0, user_d);
// Attempt the same application again // Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2); .getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null, FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null,
rmContext); spyRMContext);
d.submitApplicationAttempt(app_1, user_d); // same user d.submitApplicationAttempt(app_1, user_d); // same user
} }
@ -373,7 +388,7 @@ public class TestLeafQueue {
final ApplicationAttemptId appAttemptId_1 = TestUtils final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2); .getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null, FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
rmContext); spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(1, a.getMetrics().getAppsSubmitted());
@ -411,14 +426,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
@ -545,21 +560,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a, new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1); a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes // Setup some nodes
@ -639,21 +654,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a, new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1); a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes // Setup some nodes
@ -750,28 +765,28 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a, new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1); a.submitApplicationAttempt(app_2, user_1);
final ApplicationAttemptId appAttemptId_3 = final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0); TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 = FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_2, a, new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), rmContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_3, user_2); a.submitApplicationAttempt(app_3, user_2);
// Setup some nodes // Setup some nodes
@ -935,14 +950,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a, new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_1); a.submitApplicationAttempt(app_1, user_1);
// Setup some nodes // Setup some nodes
@ -1043,14 +1058,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a, new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_1); a.submitApplicationAttempt(app_1, user_1);
// Setup some nodes // Setup some nodes
@ -1150,14 +1165,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a, new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_1); a.submitApplicationAttempt(app_1, user_1);
// Setup some nodes // Setup some nodes
@ -1277,7 +1292,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext)); mock(ActiveUsersManager.class), spyRMContext));
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks // Setup some nodes and racks
@ -1418,7 +1433,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext)); mock(ActiveUsersManager.class), spyRMContext));
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks // Setup some nodes and racks
@ -1549,7 +1564,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext)); mock(ActiveUsersManager.class), spyRMContext));
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks // Setup some nodes and racks
@ -1652,21 +1667,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_e, e, new FiCaSchedulerApp(appAttemptId_0, user_e, e,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
e.submitApplicationAttempt(app_0, user_e); e.submitApplicationAttempt(app_0, user_e);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_e, e, new FiCaSchedulerApp(appAttemptId_1, user_e, e,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
e.submitApplicationAttempt(app_1, user_e); // same user e.submitApplicationAttempt(app_1, user_e); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_e, e, new FiCaSchedulerApp(appAttemptId_2, user_e, e,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
e.submitApplicationAttempt(app_2, user_e); // same user e.submitApplicationAttempt(app_2, user_e); // same user
// before reinitialization // before reinitialization
@ -1730,21 +1745,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_e, e, new FiCaSchedulerApp(appAttemptId_0, user_e, e,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
e.submitApplicationAttempt(app_0, user_e); e.submitApplicationAttempt(app_0, user_e);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_e, e, new FiCaSchedulerApp(appAttemptId_1, user_e, e,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
e.submitApplicationAttempt(app_1, user_e); // same user e.submitApplicationAttempt(app_1, user_e); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_e, e, new FiCaSchedulerApp(appAttemptId_2, user_e, e,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
e.submitApplicationAttempt(app_2, user_e); // same user e.submitApplicationAttempt(app_2, user_e); // same user
// before updating cluster resource // before updating cluster resource
@ -1807,14 +1822,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext)); mock(ActiveUsersManager.class), spyRMContext));
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext)); mock(ActiveUsersManager.class), spyRMContext));
a.submitApplicationAttempt(app_1, user_0); a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes and racks // Setup some nodes and racks
@ -2062,14 +2077,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext); mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
// Setup some nodes // Setup some nodes

View File

@ -162,7 +162,7 @@ public class FairSchedulerTestBase {
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
new RMAppAttemptMetrics(id)); new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
resourceManager.getRMContext().getRMApps() resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp); .put(id.getApplicationId(), rmApp);
return id; return id;
@ -183,7 +183,7 @@ public class FairSchedulerTestBase {
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
new RMAppAttemptMetrics(id)); new RMAppAttemptMetrics(id,resourceManager.getRMContext()));
resourceManager.getRMContext().getRMApps() resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp); .put(id.getApplicationId(), rmApp);
return id; return id;

View File

@ -136,7 +136,7 @@ public class TestRMWebAppFairScheduler {
MockRMApp app = new MockRMApp(i, i, state) { MockRMApp app = new MockRMApp(i, i, state) {
@Override @Override
public RMAppMetrics getRMAppMetrics() { public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0); return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
} }
@Override @Override
public YarnApplicationState createApplicationState() { public YarnApplicationState createApplicationState() {

View File

@ -1322,7 +1322,7 @@ public class TestRMWebServicesApps extends JerseyTest {
Exception { Exception {
// 28 because trackingUrl not assigned yet // 28 because trackingUrl not assigned yet
assertEquals("incorrect number of elements", 24, info.length()); assertEquals("incorrect number of elements", 26, info.length());
verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"), verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"),
info.getString("name"), info.getString("applicationType"), info.getString("name"), info.getString("applicationType"),

View File

@ -1197,7 +1197,9 @@ ResourceManager REST API's.
"queue" : "default", "queue" : "default",
"allocatedMB" : 0, "allocatedMB" : 0,
"allocatedVCores" : 0, "allocatedVCores" : 0,
"runningContainers" : 0 "runningContainers" : 0,
"memorySeconds" : 151730,
"vcoreSeconds" : 103
}, },
{ {
"finishedTime" : 1326815789546, "finishedTime" : 1326815789546,
@ -1218,7 +1220,9 @@ ResourceManager REST API's.
"queue" : "default", "queue" : "default",
"allocatedMB" : 0, "allocatedMB" : 0,
"allocatedVCores" : 0, "allocatedVCores" : 0,
"runningContainers" : 1 "runningContainers" : 1,
"memorySeconds" : 640064,
"vcoreSeconds" : 442
} }
] ]
} }
@ -1271,6 +1275,8 @@ _01_000001</amContainerLogs>
<allocatedMB>0</allocatedMB> <allocatedMB>0</allocatedMB>
<allocatedVCores>0</allocatedVCores> <allocatedVCores>0</allocatedVCores>
<runningContainers>0</runningContainers> <runningContainers>0</runningContainers>
<memorySeconds>151730</memorySeconds>
<vcoreSeconds>103</vcoreSeconds>
</app> </app>
<app> <app>
<id>application_1326815542473_0002</id> <id>application_1326815542473_0002</id>
@ -1293,6 +1299,8 @@ _01_000001</amContainerLogs>
<allocatedMB>0</allocatedMB> <allocatedMB>0</allocatedMB>
<allocatedVCores>0</allocatedVCores> <allocatedVCores>0</allocatedVCores>
<runningContainers>0</runningContainers> <runningContainers>0</runningContainers>
<memorySeconds>640064</memorySeconds>
<vcoreSeconds>442</vcoreSeconds>
</app> </app>
</apps> </apps>
+---+ +---+
@ -1491,6 +1499,10 @@ _01_000001</amContainerLogs>
+---------------------------------------------------------------+ +---------------------------------------------------------------+
| runningContainers | int | The number of containers currently running for the application | | runningContainers | int | The number of containers currently running for the application |
+---------------------------------------------------------------+ +---------------------------------------------------------------+
| memorySeconds | long | The amount of memory the application has allocated (megabyte-seconds) |
*---------------+--------------+--------------------------------+
| vcoreSeconds | long | The amount of CPU resources the application has allocated (virtual core-seconds) |
*---------------+--------------+--------------------------------+
** Response Examples ** Response Examples
@ -1532,7 +1544,9 @@ _01_000001</amContainerLogs>
"elapsedTime" : 446748, "elapsedTime" : 446748,
"diagnostics" : "", "diagnostics" : "",
"trackingUrl" : "http://host.domain.com:8088/proxy/application_1326821518301_0005/jobhistory/job/job_1326821518301_5_5", "trackingUrl" : "http://host.domain.com:8088/proxy/application_1326821518301_0005/jobhistory/job/job_1326821518301_5_5",
"queue" : "a1" "queue" : "a1",
"memorySeconds" : 151730,
"vcoreSeconds" : 103
} }
} }
+---+ +---+
@ -1576,6 +1590,8 @@ _01_000001</amContainerLogs>
<elapsedTime>446748</elapsedTime> <elapsedTime>446748</elapsedTime>
<amContainerLogs>http://host.domain.com:8042/node/containerlogs/container_1326821518301_0005_01_000001</amContainerLogs> <amContainerLogs>http://host.domain.com:8042/node/containerlogs/container_1326821518301_0005_01_000001</amContainerLogs>
<amHostHttpAddress>host.domain.com:8042</amHostHttpAddress> <amHostHttpAddress>host.domain.com:8042</amHostHttpAddress>
<memorySeconds>151730</memorySeconds>
<vcoreSeconds>103</vcoreSeconds>
</app> </app>
+---+ +---+