YARN-7010. Federation: routing REST invocations transparently to multiple RMs (part 2 - getApps). (Contributed by Giovanni Matteo Fumarola via curino)
This commit is contained in:
parent
63fc1b0b6d
commit
cc8893edc0
|
@ -2753,6 +2753,15 @@ public class YarnConfiguration extends Configuration {
|
|||
"org.apache.hadoop.yarn.server.router.webapp."
|
||||
+ "DefaultRequestInterceptorREST";
|
||||
|
||||
/**
|
||||
* The interceptor class used in FederationInterceptorREST should return
|
||||
* partial AppReports.
|
||||
*/
|
||||
public static final String ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
|
||||
ROUTER_WEBAPP_PREFIX + "partial-result.enabled";
|
||||
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
|
||||
false;
|
||||
|
||||
////////////////////////////////
|
||||
// Other Configs
|
||||
////////////////////////////////
|
||||
|
|
|
@ -159,6 +159,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
|||
|
||||
configurationPrefixToSkipCompare
|
||||
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
|
||||
configurationPrefixToSkipCompare
|
||||
.add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
|
||||
|
||||
// Set by container-executor.cfg
|
||||
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
|
||||
|
|
|
@ -83,7 +83,7 @@ public class UnmanagedApplicationManager {
|
|||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(UnmanagedApplicationManager.class);
|
||||
private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
|
||||
private static final String APP_NAME = "UnmanagedAM";
|
||||
public static final String APP_NAME = "UnmanagedAM";
|
||||
private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
|
||||
|
||||
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields.DeSel
|
|||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
@XmlRootElement(name = "app")
|
||||
|
@ -71,9 +72,9 @@ public class AppInfo {
|
|||
// these are ok for any user to see
|
||||
protected String id;
|
||||
protected String user;
|
||||
protected String name;
|
||||
private String name;
|
||||
protected String queue;
|
||||
protected YarnApplicationState state;
|
||||
private YarnApplicationState state;
|
||||
protected FinalApplicationStatus finalStatus;
|
||||
protected float progress;
|
||||
protected String trackingUI;
|
||||
|
@ -91,21 +92,21 @@ public class AppInfo {
|
|||
protected String amContainerLogs;
|
||||
protected String amHostHttpAddress;
|
||||
private String amRPCAddress;
|
||||
protected long allocatedMB;
|
||||
protected long allocatedVCores;
|
||||
protected long reservedMB;
|
||||
protected long reservedVCores;
|
||||
protected int runningContainers;
|
||||
protected long memorySeconds;
|
||||
protected long vcoreSeconds;
|
||||
private long allocatedMB;
|
||||
private long allocatedVCores;
|
||||
private long reservedMB;
|
||||
private long reservedVCores;
|
||||
private int runningContainers;
|
||||
private long memorySeconds;
|
||||
private long vcoreSeconds;
|
||||
protected float queueUsagePercentage;
|
||||
protected float clusterUsagePercentage;
|
||||
|
||||
// preemption info fields
|
||||
protected long preemptedResourceMB;
|
||||
protected long preemptedResourceVCores;
|
||||
protected int numNonAMContainerPreempted;
|
||||
protected int numAMContainerPreempted;
|
||||
private long preemptedResourceMB;
|
||||
private long preemptedResourceVCores;
|
||||
private int numNonAMContainerPreempted;
|
||||
private int numAMContainerPreempted;
|
||||
private long preemptedMemorySeconds;
|
||||
private long preemptedVcoreSeconds;
|
||||
|
||||
|
@ -142,12 +143,11 @@ public class AppInfo {
|
|||
|| YarnApplicationState.NEW_SAVING == this.state
|
||||
|| YarnApplicationState.SUBMITTED == this.state
|
||||
|| YarnApplicationState.ACCEPTED == this.state;
|
||||
this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app
|
||||
.getFinishTime() == 0 ? "ApplicationMaster" : "History");
|
||||
this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED"
|
||||
: (app.getFinishTime() == 0 ? "ApplicationMaster" : "History");
|
||||
if (!trackingUrlIsNotReady) {
|
||||
this.trackingUrl =
|
||||
WebAppUtils.getURLWithScheme(schemePrefix,
|
||||
trackingUrl);
|
||||
WebAppUtils.getURLWithScheme(schemePrefix, trackingUrl);
|
||||
this.trackingUrlPretty = this.trackingUrl;
|
||||
} else {
|
||||
this.trackingUrlPretty = "UNASSIGNED";
|
||||
|
@ -162,15 +162,15 @@ public class AppInfo {
|
|||
this.priority = 0;
|
||||
|
||||
if (app.getApplicationPriority() != null) {
|
||||
this.priority = app.getApplicationPriority()
|
||||
.getPriority();
|
||||
this.priority = app.getApplicationPriority().getPriority();
|
||||
}
|
||||
this.progress = app.getProgress() * 100;
|
||||
this.diagnostics = app.getDiagnostics().toString();
|
||||
if (diagnostics == null || diagnostics.isEmpty()) {
|
||||
this.diagnostics = "";
|
||||
}
|
||||
if (app.getApplicationTags() != null && !app.getApplicationTags().isEmpty()) {
|
||||
if (app.getApplicationTags() != null
|
||||
&& !app.getApplicationTags().isEmpty()) {
|
||||
this.applicationTags = Joiner.on(',').join(app.getApplicationTags());
|
||||
}
|
||||
this.finalStatus = app.getFinalApplicationStatus();
|
||||
|
@ -178,8 +178,8 @@ public class AppInfo {
|
|||
if (hasAccess) {
|
||||
this.startedTime = app.getStartTime();
|
||||
this.finishedTime = app.getFinishTime();
|
||||
this.elapsedTime = Times.elapsed(app.getStartTime(),
|
||||
app.getFinishTime());
|
||||
this.elapsedTime =
|
||||
Times.elapsed(app.getStartTime(), app.getFinishTime());
|
||||
this.logAggregationStatus = app.getLogAggregationStatusForAppReport();
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
if (attempt != null) {
|
||||
|
@ -194,8 +194,8 @@ public class AppInfo {
|
|||
|
||||
this.amRPCAddress = getAmRPCAddressFromRMAppAttempt(attempt);
|
||||
|
||||
ApplicationResourceUsageReport resourceReport = attempt
|
||||
.getApplicationResourceUsageReport();
|
||||
ApplicationResourceUsageReport resourceReport =
|
||||
attempt.getApplicationResourceUsageReport();
|
||||
if (resourceReport != null) {
|
||||
Resource usedResources = resourceReport.getUsedResources();
|
||||
Resource reservedResources = resourceReport.getReservedResources();
|
||||
|
@ -208,9 +208,10 @@ public class AppInfo {
|
|||
clusterUsagePercentage = resourceReport.getClusterUsagePercentage();
|
||||
}
|
||||
|
||||
/* When the deSelects parameter contains "resourceRequests",
|
||||
it skips returning massive ResourceRequest objects and vice versa.
|
||||
Default behavior is no skipping. (YARN-6280)
|
||||
/*
|
||||
* When the deSelects parameter contains "resourceRequests", it skips
|
||||
* returning massive ResourceRequest objects and vice versa. Default
|
||||
* behavior is no skipping. (YARN-6280)
|
||||
*/
|
||||
if (!deSelects.contains(DeSelectType.RESOURCE_REQUESTS)) {
|
||||
List<ResourceRequest> resourceRequestsRaw = rm.getRMContext()
|
||||
|
@ -228,12 +229,9 @@ public class AppInfo {
|
|||
|
||||
// copy preemption info fields
|
||||
RMAppMetrics appMetrics = app.getRMAppMetrics();
|
||||
numAMContainerPreempted =
|
||||
appMetrics.getNumAMContainersPreempted();
|
||||
preemptedResourceMB =
|
||||
appMetrics.getResourcePreempted().getMemorySize();
|
||||
numNonAMContainerPreempted =
|
||||
appMetrics.getNumNonAMContainersPreempted();
|
||||
numAMContainerPreempted = appMetrics.getNumAMContainersPreempted();
|
||||
preemptedResourceMB = appMetrics.getResourcePreempted().getMemorySize();
|
||||
numNonAMContainerPreempted = appMetrics.getNumNonAMContainersPreempted();
|
||||
preemptedResourceVCores =
|
||||
appMetrics.getResourcePreempted().getVirtualCores();
|
||||
memorySeconds = appMetrics.getMemorySeconds();
|
||||
|
@ -242,8 +240,7 @@ public class AppInfo {
|
|||
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
app.getApplicationSubmissionContext();
|
||||
unmanagedApplication =
|
||||
appSubmissionContext.getUnmanagedAM();
|
||||
unmanagedApplication = appSubmissionContext.getUnmanagedAM();
|
||||
appNodeLabelExpression =
|
||||
app.getApplicationSubmissionContext().getNodeLabelExpression();
|
||||
amNodeLabelExpression = (unmanagedApplication) ? null
|
||||
|
@ -286,6 +283,7 @@ public class AppInfo {
|
|||
timeouts.add(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -417,22 +415,6 @@ public class AppInfo {
|
|||
return this.reservedVCores;
|
||||
}
|
||||
|
||||
public long getPreemptedMB() {
|
||||
return preemptedResourceMB;
|
||||
}
|
||||
|
||||
public long getPreemptedVCores() {
|
||||
return preemptedResourceVCores;
|
||||
}
|
||||
|
||||
public int getNumNonAMContainersPreempted() {
|
||||
return numNonAMContainerPreempted;
|
||||
}
|
||||
|
||||
public int getNumAMContainersPreempted() {
|
||||
return numAMContainerPreempted;
|
||||
}
|
||||
|
||||
public long getMemorySeconds() {
|
||||
return memorySeconds;
|
||||
}
|
||||
|
@ -448,10 +430,15 @@ public class AppInfo {
|
|||
public long getPreemptedVcoreSeconds() {
|
||||
return preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
public List<ResourceRequestInfo> getResourceRequests() {
|
||||
return this.resourceRequests;
|
||||
}
|
||||
|
||||
public void setResourceRequests(List<ResourceRequestInfo> resourceRequests) {
|
||||
this.resourceRequests = resourceRequests;
|
||||
}
|
||||
|
||||
public LogAggregationStatus getLogAggregationStatus() {
|
||||
return this.logAggregationStatus;
|
||||
}
|
||||
|
@ -475,4 +462,89 @@ public class AppInfo {
|
|||
public ResourcesInfo getResourceInfo() {
|
||||
return resourceInfo;
|
||||
}
|
||||
|
||||
public long getPreemptedResourceMB() {
|
||||
return preemptedResourceMB;
|
||||
}
|
||||
|
||||
public void setPreemptedResourceMB(long preemptedResourceMB) {
|
||||
this.preemptedResourceMB = preemptedResourceMB;
|
||||
}
|
||||
|
||||
public long getPreemptedResourceVCores() {
|
||||
return preemptedResourceVCores;
|
||||
}
|
||||
|
||||
public void setPreemptedResourceVCores(long preemptedResourceVCores) {
|
||||
this.preemptedResourceVCores = preemptedResourceVCores;
|
||||
}
|
||||
|
||||
public int getNumNonAMContainerPreempted() {
|
||||
return numNonAMContainerPreempted;
|
||||
}
|
||||
|
||||
public void setNumNonAMContainerPreempted(int numNonAMContainerPreempted) {
|
||||
this.numNonAMContainerPreempted = numNonAMContainerPreempted;
|
||||
}
|
||||
|
||||
public int getNumAMContainerPreempted() {
|
||||
return numAMContainerPreempted;
|
||||
}
|
||||
|
||||
public void setNumAMContainerPreempted(int numAMContainerPreempted) {
|
||||
this.numAMContainerPreempted = numAMContainerPreempted;
|
||||
}
|
||||
|
||||
public void setPreemptedMemorySeconds(long preemptedMemorySeconds) {
|
||||
this.preemptedMemorySeconds = preemptedMemorySeconds;
|
||||
}
|
||||
|
||||
public void setPreemptedVcoreSeconds(long preemptedVcoreSeconds) {
|
||||
this.preemptedVcoreSeconds = preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
public void setAllocatedMB(long allocatedMB) {
|
||||
this.allocatedMB = allocatedMB;
|
||||
}
|
||||
|
||||
public void setAllocatedVCores(long allocatedVCores) {
|
||||
this.allocatedVCores = allocatedVCores;
|
||||
}
|
||||
|
||||
public void setReservedMB(long reservedMB) {
|
||||
this.reservedMB = reservedMB;
|
||||
}
|
||||
|
||||
public void setReservedVCores(long reservedVCores) {
|
||||
this.reservedVCores = reservedVCores;
|
||||
}
|
||||
|
||||
public void setRunningContainers(int runningContainers) {
|
||||
this.runningContainers = runningContainers;
|
||||
}
|
||||
|
||||
public void setMemorySeconds(long memorySeconds) {
|
||||
this.memorySeconds = memorySeconds;
|
||||
}
|
||||
|
||||
public void setVcoreSeconds(long vcoreSeconds) {
|
||||
this.vcoreSeconds = vcoreSeconds;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.id = appId;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setAMHostHttpAddress(String amHost) {
|
||||
this.amHostHttpAddress = amHost;
|
||||
}
|
||||
|
||||
public void setState(YarnApplicationState state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,4 +40,8 @@ public class AppsInfo {
|
|||
return app;
|
||||
}
|
||||
|
||||
public void addAll(ArrayList<AppInfo> appsInfo) {
|
||||
app.addAll(appsInfo);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -49,6 +49,8 @@ public final class RouterMetrics {
|
|||
private MutableGaugeInt numAppsFailedKilled;
|
||||
@Metric("# of application reports failed to be retrieved")
|
||||
private MutableGaugeInt numAppsFailedRetrieved;
|
||||
@Metric("# of multiple applications reports failed to be retrieved")
|
||||
private MutableGaugeInt numMultipleAppsFailedRetrieved;
|
||||
|
||||
// Aggregate metrics are shared, and don't have to be looked up per call
|
||||
@Metric("Total number of successful Submitted apps and latency(ms)")
|
||||
|
@ -59,6 +61,9 @@ public final class RouterMetrics {
|
|||
private MutableRate totalSucceededAppsCreated;
|
||||
@Metric("Total number of successful Retrieved app reports and latency(ms)")
|
||||
private MutableRate totalSucceededAppsRetrieved;
|
||||
@Metric("Total number of successful Retrieved multiple apps reports and "
|
||||
+ "latency(ms)")
|
||||
private MutableRate totalSucceededMultipleAppsRetrieved;
|
||||
|
||||
/**
|
||||
* Provide quantile counters for all latencies.
|
||||
|
@ -67,6 +72,7 @@ public final class RouterMetrics {
|
|||
private MutableQuantiles getNewApplicationLatency;
|
||||
private MutableQuantiles killApplicationLatency;
|
||||
private MutableQuantiles getApplicationReportLatency;
|
||||
private MutableQuantiles getApplicationsReportLatency;
|
||||
|
||||
private static volatile RouterMetrics INSTANCE = null;
|
||||
private static MetricsRegistry registry;
|
||||
|
@ -83,6 +89,9 @@ public final class RouterMetrics {
|
|||
getApplicationReportLatency =
|
||||
registry.newQuantiles("getApplicationReportLatency",
|
||||
"latency of get application report", "ops", "latency", 10);
|
||||
getApplicationsReportLatency =
|
||||
registry.newQuantiles("getApplicationsReportLatency",
|
||||
"latency of get applications report", "ops", "latency", 10);
|
||||
}
|
||||
|
||||
public static RouterMetrics getMetrics() {
|
||||
|
@ -124,6 +133,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededAppsRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededMultipleAppsRetrieved() {
|
||||
return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsCreated() {
|
||||
return totalSucceededAppsCreated.lastStat().mean();
|
||||
|
@ -144,6 +158,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededAppsRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededMultipleGetAppReport() {
|
||||
return totalSucceededMultipleAppsRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedCreated() {
|
||||
return numAppsFailedCreated.value();
|
||||
|
@ -164,6 +183,11 @@ public final class RouterMetrics {
|
|||
return numAppsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getMultipleAppsFailedRetrieved() {
|
||||
return numMultipleAppsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public void succeededAppsCreated(long duration) {
|
||||
totalSucceededAppsCreated.add(duration);
|
||||
getNewApplicationLatency.add(duration);
|
||||
|
@ -184,6 +208,11 @@ public final class RouterMetrics {
|
|||
getApplicationReportLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededMultipleAppsRetrieved(long duration) {
|
||||
totalSucceededMultipleAppsRetrieved.add(duration);
|
||||
getApplicationsReportLatency.add(duration);
|
||||
}
|
||||
|
||||
public void incrAppsFailedCreated() {
|
||||
numAppsFailedCreated.incr();
|
||||
}
|
||||
|
@ -200,4 +229,8 @@ public final class RouterMetrics {
|
|||
numAppsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrMultipleAppsFailedRetrieved() {
|
||||
numMultipleAppsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
}
|
|
@ -25,6 +25,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
@ -102,9 +107,15 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
private RouterPolicyFacade policyFacade;
|
||||
private RouterMetrics routerMetrics;
|
||||
private final Clock clock = new MonotonicClock();
|
||||
private boolean returnPartialReport;
|
||||
|
||||
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
|
||||
|
||||
/**
|
||||
* Thread pool used for asynchronous operations.
|
||||
*/
|
||||
private ExecutorService threadpool;
|
||||
|
||||
@Override
|
||||
public void init(String user) {
|
||||
federationFacade = FederationStateStoreFacade.getInstance();
|
||||
|
@ -125,6 +136,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
|
||||
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
|
||||
routerMetrics = RouterMetrics.getMetrics();
|
||||
threadpool = Executors.newCachedThreadPool();
|
||||
|
||||
returnPartialReport =
|
||||
conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
|
||||
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
|
||||
}
|
||||
|
||||
private SubClusterId getRandomActiveSubCluster(
|
||||
|
@ -586,6 +602,99 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
|
||||
* after that it will group all the ApplicationReports by the ApplicationId.
|
||||
* <p>
|
||||
* Possible failure:
|
||||
* <p>
|
||||
* Client: identical behavior as {@code RMWebServices}.
|
||||
* <p>
|
||||
* Router: the Client will timeout and resubmit the request.
|
||||
* <p>
|
||||
* ResourceManager: the Router calls each Yarn RM in parallel by using one
|
||||
* thread for each Yarn RM. In case a Yarn RM fails, a single call will
|
||||
* timeout. However the Router will merge the ApplicationReports it got, and
|
||||
* provides a partial list to the client.
|
||||
* <p>
|
||||
* State Store: the Router will timeout and it will retry depending on the
|
||||
* FederationFacade settings - if the failure happened before the select
|
||||
* operation.
|
||||
*/
|
||||
@Override
|
||||
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
||||
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
||||
String queueQuery, String count, String startedBegin, String startedEnd,
|
||||
String finishBegin, String finishEnd, Set<String> applicationTypes,
|
||||
Set<String> applicationTags, Set<String> unselectedFields) {
|
||||
AppsInfo apps = new AppsInfo();
|
||||
long startTime = clock.getTime();
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
|
||||
try {
|
||||
subClustersActive = federationFacade.getSubClusters(true);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrMultipleAppsFailedRetrieved();
|
||||
return null;
|
||||
}
|
||||
|
||||
// Send the requests in parallel
|
||||
|
||||
ExecutorCompletionService<AppsInfo> compSvc =
|
||||
new ExecutorCompletionService<AppsInfo>(this.threadpool);
|
||||
|
||||
for (final SubClusterInfo info : subClustersActive.values()) {
|
||||
compSvc.submit(new Callable<AppsInfo>() {
|
||||
@Override
|
||||
public AppsInfo call() {
|
||||
DefaultRequestInterceptorREST interceptor =
|
||||
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
|
||||
info.getClientRMServiceAddress());
|
||||
AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery,
|
||||
finalStatusQuery, userQuery, queueQuery, count, startedBegin,
|
||||
startedEnd, finishBegin, finishEnd, applicationTypes,
|
||||
applicationTags, unselectedFields);
|
||||
|
||||
if (rmApps == null) {
|
||||
routerMetrics.incrMultipleAppsFailedRetrieved();
|
||||
LOG.error("Subcluster " + info.getSubClusterId()
|
||||
+ " failed to return appReport.");
|
||||
return null;
|
||||
}
|
||||
return rmApps;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Collect all the responses in parallel
|
||||
|
||||
for (int i = 0; i < subClustersActive.values().size(); i++) {
|
||||
try {
|
||||
Future<AppsInfo> future = compSvc.take();
|
||||
AppsInfo appsResponse = future.get();
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
|
||||
|
||||
if (appsResponse != null) {
|
||||
apps.addAll(appsResponse.getApps());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
routerMetrics.incrMultipleAppsFailedRetrieved();
|
||||
LOG.warn("Failed to get application report ", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (apps.getApps().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Merge all the application reports got from all the available Yarn RMs
|
||||
|
||||
return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(),
|
||||
returnPartialReport);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterInfo get() {
|
||||
return getClusterInfo();
|
||||
|
@ -639,15 +748,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
||||
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
||||
String queueQuery, String count, String startedBegin, String startedEnd,
|
||||
String finishBegin, String finishEnd, Set<String> applicationTypes,
|
||||
Set<String> applicationTags, Set<String> unselectedFields) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppState getAppState(HttpServletRequest hsr, String appId)
|
||||
throws AuthorizationException {
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.router.webapp;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -33,7 +35,11 @@ import javax.ws.rs.core.Response.ResponseBuilder;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
|
@ -55,6 +61,8 @@ public final class RouterWebServiceUtil {
|
|||
private static final Log LOG =
|
||||
LogFactory.getLog(RouterWebServiceUtil.class.getName());
|
||||
|
||||
private final static String PARTIAL_REPORT = "Partial Report ";
|
||||
|
||||
/** Disable constructor. */
|
||||
private RouterWebServiceUtil() {
|
||||
}
|
||||
|
@ -224,4 +232,103 @@ public final class RouterWebServiceUtil {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges a list of AppInfo grouping by ApplicationId. Our current policy
|
||||
* is to merge the application reports from the reacheable SubClusters.
|
||||
* Via configuration parameter, we decide whether to return applications
|
||||
* for which the primary AM is missing or to omit them.
|
||||
*
|
||||
* @param appsInfo a list of AppInfo to merge
|
||||
* @param returnPartialResult if the merge AppsInfo should contain partial
|
||||
* result or not
|
||||
* @return the merged AppsInfo
|
||||
*/
|
||||
public static AppsInfo mergeAppsInfo(ArrayList<AppInfo> appsInfo,
|
||||
boolean returnPartialResult) {
|
||||
AppsInfo allApps = new AppsInfo();
|
||||
|
||||
Map<String, AppInfo> federationAM = new HashMap<String, AppInfo>();
|
||||
Map<String, AppInfo> federationUAMSum = new HashMap<String, AppInfo>();
|
||||
for (AppInfo a : appsInfo) {
|
||||
// Check if this AppInfo is an AM
|
||||
if (a.getAMHostHttpAddress() != null) {
|
||||
// Insert in the list of AM
|
||||
federationAM.put(a.getAppId(), a);
|
||||
// Check if there are any UAM found before
|
||||
if (federationUAMSum.containsKey(a.getAppId())) {
|
||||
// Merge the current AM with the found UAM
|
||||
mergeAMWithUAM(a, federationUAMSum.get(a.getAppId()));
|
||||
// Remove the sum of the UAMs
|
||||
federationUAMSum.remove(a.getAppId());
|
||||
}
|
||||
// This AppInfo is an UAM
|
||||
} else {
|
||||
if (federationAM.containsKey(a.getAppId())) {
|
||||
// Merge the current UAM with its own AM
|
||||
mergeAMWithUAM(federationAM.get(a.getAppId()), a);
|
||||
} else if (federationUAMSum.containsKey(a.getAppId())) {
|
||||
// Merge the current UAM with its own UAM and update the list of UAM
|
||||
federationUAMSum.put(a.getAppId(),
|
||||
mergeUAMWithUAM(federationUAMSum.get(a.getAppId()), a));
|
||||
} else {
|
||||
// Insert in the list of UAM
|
||||
federationUAMSum.put(a.getAppId(), a);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check the remaining UAMs are depending or not from federation
|
||||
for (AppInfo a : federationUAMSum.values()) {
|
||||
if (returnPartialResult || (a.getName() != null
|
||||
&& !(a.getName().startsWith(UnmanagedApplicationManager.APP_NAME)
|
||||
|| a.getName().startsWith(PARTIAL_REPORT)))) {
|
||||
federationAM.put(a.getAppId(), a);
|
||||
}
|
||||
}
|
||||
|
||||
allApps.addAll(new ArrayList<AppInfo>(federationAM.values()));
|
||||
return allApps;
|
||||
}
|
||||
|
||||
private static AppInfo mergeUAMWithUAM(AppInfo uam1, AppInfo uam2) {
|
||||
AppInfo partialReport = new AppInfo();
|
||||
partialReport.setAppId(uam1.getAppId());
|
||||
partialReport.setName(PARTIAL_REPORT + uam1.getAppId());
|
||||
// We pick the status of the first uam
|
||||
partialReport.setState(uam1.getState());
|
||||
// Merge the newly partial AM with UAM1 and then with UAM2
|
||||
mergeAMWithUAM(partialReport, uam1);
|
||||
mergeAMWithUAM(partialReport, uam2);
|
||||
return partialReport;
|
||||
}
|
||||
|
||||
private static void mergeAMWithUAM(AppInfo am, AppInfo uam) {
|
||||
am.setPreemptedResourceMB(
|
||||
am.getPreemptedResourceMB() + uam.getPreemptedResourceMB());
|
||||
am.setPreemptedResourceVCores(
|
||||
am.getPreemptedResourceVCores() + uam.getPreemptedResourceVCores());
|
||||
am.setNumNonAMContainerPreempted(am.getNumNonAMContainerPreempted()
|
||||
+ uam.getNumNonAMContainerPreempted());
|
||||
am.setNumAMContainerPreempted(
|
||||
am.getNumAMContainerPreempted() + uam.getNumAMContainerPreempted());
|
||||
am.setPreemptedMemorySeconds(
|
||||
am.getPreemptedMemorySeconds() + uam.getPreemptedMemorySeconds());
|
||||
am.setPreemptedVcoreSeconds(
|
||||
am.getPreemptedVcoreSeconds() + uam.getPreemptedVcoreSeconds());
|
||||
|
||||
if (am.getState() == YarnApplicationState.RUNNING
|
||||
&& uam.getState() == am.getState()) {
|
||||
|
||||
am.getResourceRequests().addAll(uam.getResourceRequests());
|
||||
|
||||
am.setAllocatedMB(am.getAllocatedMB() + uam.getAllocatedMB());
|
||||
am.setAllocatedVCores(am.getAllocatedVCores() + uam.getAllocatedVCores());
|
||||
am.setReservedMB(am.getReservedMB() + uam.getReservedMB());
|
||||
am.setReservedVCores(am.getReservedVCores() + uam.getReservedMB());
|
||||
am.setRunningContainers(
|
||||
am.getRunningContainers() + uam.getRunningContainers());
|
||||
am.setMemorySeconds(am.getMemorySeconds() + uam.getMemorySeconds());
|
||||
am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -196,6 +196,45 @@ public class TestRouterMetrics {
|
|||
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Retrieved Multiple Apps
|
||||
* successfully.
|
||||
*/
|
||||
@Test
|
||||
public void testSucceededMultipleAppsReport() {
|
||||
|
||||
long totalGoodBefore = metrics.getNumSucceededMultipleAppsRetrieved();
|
||||
|
||||
goodSubCluster.getApplicationsReport(100);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededMultipleAppsRetrieved());
|
||||
Assert.assertEquals(100, metrics.getLatencySucceededMultipleGetAppReport(),
|
||||
0);
|
||||
|
||||
goodSubCluster.getApplicationsReport(200);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededMultipleAppsRetrieved());
|
||||
Assert.assertEquals(150, metrics.getLatencySucceededMultipleGetAppReport(),
|
||||
0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Failed to retrieve
|
||||
* Multiple Apps.
|
||||
*/
|
||||
@Test
|
||||
public void testMulipleAppsReportFailed() {
|
||||
|
||||
long totalBadbefore = metrics.getMultipleAppsFailedRetrieved();
|
||||
|
||||
badSubCluster.getApplicationsReport();
|
||||
|
||||
Assert.assertEquals(totalBadbefore + 1,
|
||||
metrics.getMultipleAppsFailedRetrieved());
|
||||
}
|
||||
|
||||
// Records failures for all calls
|
||||
private class MockBadSubCluster {
|
||||
public void getNewApplication() {
|
||||
|
@ -217,6 +256,11 @@ public class TestRouterMetrics {
|
|||
LOG.info("Mocked: failed getApplicationReport call");
|
||||
metrics.incrAppsFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getApplicationsReport() {
|
||||
LOG.info("Mocked: failed getApplicationsReport call");
|
||||
metrics.incrMultipleAppsFailedRetrieved();
|
||||
}
|
||||
}
|
||||
|
||||
// Records successes for all calls
|
||||
|
@ -244,5 +288,11 @@ public class TestRouterMetrics {
|
|||
duration);
|
||||
metrics.succeededAppsRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getApplicationsReport(long duration) {
|
||||
LOG.info("Mocked: successful getApplicationsReport call with duration {}",
|
||||
duration);
|
||||
metrics.succeededMultipleAppsRetrieved(duration);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,26 +18,32 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.router.webapp;
|
||||
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class mocks the RESTRequestInterceptor.
|
||||
*/
|
||||
|
@ -100,6 +106,27 @@ public class MockDefaultRequestInterceptorREST
|
|||
return new AppInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
||||
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
||||
String queueQuery, String count, String startedBegin, String startedEnd,
|
||||
String finishBegin, String finishEnd, Set<String> applicationTypes,
|
||||
Set<String> applicationTags, Set<String> unselectedFields) {
|
||||
if (!isRunning) {
|
||||
throw new RuntimeException("RM is stopped");
|
||||
}
|
||||
AppsInfo appsInfo = new AppsInfo();
|
||||
AppInfo appInfo = new AppInfo();
|
||||
|
||||
appInfo.setAppId(
|
||||
ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
|
||||
applicationCounter.incrementAndGet()).toString());
|
||||
appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");
|
||||
|
||||
appsInfo.add(appInfo);
|
||||
return appsInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
|
||||
String appId) throws AuthorizationException, YarnException,
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUt
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -374,4 +375,20 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||
Assert.assertNull(response);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetApplicationsReport in case each
|
||||
* subcluster provided one application.
|
||||
*/
|
||||
@Test
|
||||
public void testGetApplicationsReport()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
|
||||
AppsInfo responseGet = interceptor.getApps(null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
|
||||
Assert.assertNotNull(responseGet);
|
||||
Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
|
||||
// The merged operations will be tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
|
||||
|
@ -271,4 +272,48 @@ public class TestFederationInterceptorRESTRetry
|
|||
.getApplicationHomeSubCluster().getHomeSubCluster());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetApps in case the cluster is
|
||||
* composed of only 1 bad SubCluster.
|
||||
*/
|
||||
@Test
|
||||
public void testGetAppsOneBadSC()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
|
||||
setupCluster(Arrays.asList(bad2));
|
||||
|
||||
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null);
|
||||
Assert.assertNull(response);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetApps in case the cluster is
|
||||
* composed of only 2 bad SubClusters.
|
||||
*/
|
||||
@Test
|
||||
public void testGetAppsTwoBadSCs()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(bad1, bad2));
|
||||
|
||||
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null);
|
||||
Assert.assertNull(response);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetApps in case the cluster is
|
||||
* composed of only 1 bad SubCluster and a good one.
|
||||
*/
|
||||
@Test
|
||||
public void testGetAppsOneBadOneGood()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(good, bad2));
|
||||
|
||||
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(1, response.getApps().size());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,311 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.webapp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test class to validate RouterWebServiceUtil methods.
|
||||
*/
|
||||
public class TestRouterWebServiceUtil {
|
||||
|
||||
private static final ApplicationId APPID1 = ApplicationId.newInstance(1, 1);
|
||||
private static final ApplicationId APPID2 = ApplicationId.newInstance(2, 1);
|
||||
private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
|
||||
private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
|
||||
|
||||
/**
|
||||
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
|
||||
* in case we want to merge 4 AMs. The expected result would be the same 4
|
||||
* AMs.
|
||||
*/
|
||||
@Test
|
||||
public void testMerge4DifferentApps() {
|
||||
|
||||
AppsInfo apps = new AppsInfo();
|
||||
int value = 1000;
|
||||
|
||||
AppInfo app1 = new AppInfo();
|
||||
app1.setAppId(APPID1.toString());
|
||||
app1.setAMHostHttpAddress("http://i_am_the_AM1:1234");
|
||||
app1.setState(YarnApplicationState.FINISHED);
|
||||
app1.setNumAMContainerPreempted(value);
|
||||
apps.add(app1);
|
||||
|
||||
AppInfo app2 = new AppInfo();
|
||||
app2.setAppId(APPID2.toString());
|
||||
app2.setAMHostHttpAddress("http://i_am_the_AM2:1234");
|
||||
app2.setState(YarnApplicationState.ACCEPTED);
|
||||
app2.setAllocatedVCores(2 * value);
|
||||
|
||||
apps.add(app2);
|
||||
|
||||
AppInfo app3 = new AppInfo();
|
||||
app3.setAppId(APPID3.toString());
|
||||
app3.setAMHostHttpAddress("http://i_am_the_AM3:1234");
|
||||
app3.setState(YarnApplicationState.RUNNING);
|
||||
app3.setReservedMB(3 * value);
|
||||
apps.add(app3);
|
||||
|
||||
AppInfo app4 = new AppInfo();
|
||||
app4.setAppId(APPID4.toString());
|
||||
app4.setAMHostHttpAddress("http://i_am_the_AM4:1234");
|
||||
app4.setState(YarnApplicationState.NEW);
|
||||
app4.setAllocatedMB(4 * value);
|
||||
apps.add(app4);
|
||||
|
||||
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(4, result.getApps().size());
|
||||
|
||||
List<String> appIds = new ArrayList<String>();
|
||||
AppInfo appInfo1 = null, appInfo2 = null, appInfo3 = null, appInfo4 = null;
|
||||
for (AppInfo app : result.getApps()) {
|
||||
appIds.add(app.getAppId());
|
||||
if (app.getAppId().equals(APPID1.toString())) {
|
||||
appInfo1 = app;
|
||||
}
|
||||
if (app.getAppId().equals(APPID2.toString())) {
|
||||
appInfo2 = app;
|
||||
}
|
||||
if (app.getAppId().equals(APPID3.toString())) {
|
||||
appInfo3 = app;
|
||||
}
|
||||
if (app.getAppId().equals(APPID4.toString())) {
|
||||
appInfo4 = app;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue(appIds.contains(APPID1.toString()));
|
||||
Assert.assertTrue(appIds.contains(APPID2.toString()));
|
||||
Assert.assertTrue(appIds.contains(APPID3.toString()));
|
||||
Assert.assertTrue(appIds.contains(APPID4.toString()));
|
||||
|
||||
// Check preservations APP1
|
||||
Assert.assertEquals(app1.getState(), appInfo1.getState());
|
||||
Assert.assertEquals(app1.getNumAMContainerPreempted(),
|
||||
appInfo1.getNumAMContainerPreempted());
|
||||
|
||||
// Check preservations APP2
|
||||
Assert.assertEquals(app2.getState(), appInfo2.getState());
|
||||
Assert.assertEquals(app3.getAllocatedVCores(),
|
||||
appInfo3.getAllocatedVCores());
|
||||
|
||||
// Check preservations APP3
|
||||
Assert.assertEquals(app3.getState(), appInfo3.getState());
|
||||
Assert.assertEquals(app3.getReservedMB(), appInfo3.getReservedMB());
|
||||
|
||||
// Check preservations APP3
|
||||
Assert.assertEquals(app4.getState(), appInfo4.getState());
|
||||
Assert.assertEquals(app3.getAllocatedMB(), appInfo3.getAllocatedMB());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
|
||||
* in case we want to merge 2 UAMs and their own AM. The status of the AM is
|
||||
* FINISHED, so we check the correctness of the merging of the historical
|
||||
* values. The expected result would be 1 report with the merged information.
|
||||
*/
|
||||
@Test
|
||||
public void testMergeAppsFinished() {
|
||||
|
||||
AppsInfo apps = new AppsInfo();
|
||||
|
||||
String amHost = "http://i_am_the_AM1:1234";
|
||||
AppInfo am = new AppInfo();
|
||||
am.setAppId(APPID1.toString());
|
||||
am.setAMHostHttpAddress(amHost);
|
||||
am.setState(YarnApplicationState.FINISHED);
|
||||
|
||||
int value = 1000;
|
||||
setAppInfoFinished(am, value);
|
||||
|
||||
apps.add(am);
|
||||
|
||||
AppInfo uam1 = new AppInfo();
|
||||
uam1.setAppId(APPID1.toString());
|
||||
apps.add(uam1);
|
||||
|
||||
setAppInfoFinished(uam1, value);
|
||||
|
||||
AppInfo uam2 = new AppInfo();
|
||||
uam2.setAppId(APPID1.toString());
|
||||
apps.add(uam2);
|
||||
|
||||
setAppInfoFinished(uam2, value);
|
||||
|
||||
// in this case the result does not change if we enable partial result
|
||||
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(1, result.getApps().size());
|
||||
|
||||
AppInfo app = result.getApps().get(0);
|
||||
|
||||
Assert.assertEquals(APPID1.toString(), app.getAppId());
|
||||
Assert.assertEquals(amHost, app.getAMHostHttpAddress());
|
||||
Assert.assertEquals(value * 3, app.getPreemptedResourceMB());
|
||||
Assert.assertEquals(value * 3, app.getPreemptedResourceVCores());
|
||||
Assert.assertEquals(value * 3, app.getNumNonAMContainerPreempted());
|
||||
Assert.assertEquals(value * 3, app.getNumAMContainerPreempted());
|
||||
Assert.assertEquals(value * 3, app.getPreemptedMemorySeconds());
|
||||
Assert.assertEquals(value * 3, app.getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
private void setAppInfoFinished(AppInfo am, int value) {
|
||||
am.setPreemptedResourceMB(value);
|
||||
am.setPreemptedResourceVCores(value);
|
||||
am.setNumNonAMContainerPreempted(value);
|
||||
am.setNumAMContainerPreempted(value);
|
||||
am.setPreemptedMemorySeconds(value);
|
||||
am.setPreemptedVcoreSeconds(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
|
||||
* in case we want to merge 2 UAMs and their own AM. The status of the AM is
|
||||
* RUNNING, so we check the correctness of the merging of the runtime values.
|
||||
* The expected result would be 1 report with the merged information.
|
||||
*/
|
||||
@Test
|
||||
public void testMergeAppsRunning() {
|
||||
|
||||
AppsInfo apps = new AppsInfo();
|
||||
|
||||
String amHost = "http://i_am_the_AM2:1234";
|
||||
AppInfo am = new AppInfo();
|
||||
am.setAppId(APPID2.toString());
|
||||
am.setAMHostHttpAddress(amHost);
|
||||
am.setState(YarnApplicationState.RUNNING);
|
||||
|
||||
int value = 1000;
|
||||
setAppInfoRunning(am, value);
|
||||
|
||||
apps.add(am);
|
||||
|
||||
AppInfo uam1 = new AppInfo();
|
||||
uam1.setAppId(APPID2.toString());
|
||||
uam1.setState(YarnApplicationState.RUNNING);
|
||||
apps.add(uam1);
|
||||
|
||||
setAppInfoRunning(uam1, value);
|
||||
|
||||
AppInfo uam2 = new AppInfo();
|
||||
uam2.setAppId(APPID2.toString());
|
||||
uam2.setState(YarnApplicationState.RUNNING);
|
||||
apps.add(uam2);
|
||||
|
||||
setAppInfoRunning(uam2, value);
|
||||
|
||||
// in this case the result does not change if we enable partial result
|
||||
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(1, result.getApps().size());
|
||||
|
||||
AppInfo app = result.getApps().get(0);
|
||||
|
||||
Assert.assertEquals(APPID2.toString(), app.getAppId());
|
||||
Assert.assertEquals(amHost, app.getAMHostHttpAddress());
|
||||
Assert.assertEquals(value * 3, app.getAllocatedMB());
|
||||
Assert.assertEquals(value * 3, app.getAllocatedVCores());
|
||||
Assert.assertEquals(value * 3, app.getReservedMB());
|
||||
Assert.assertEquals(value * 3, app.getReservedVCores());
|
||||
Assert.assertEquals(value * 3, app.getRunningContainers());
|
||||
Assert.assertEquals(value * 3, app.getMemorySeconds());
|
||||
Assert.assertEquals(value * 3, app.getVcoreSeconds());
|
||||
Assert.assertEquals(3, app.getResourceRequests().size());
|
||||
}
|
||||
|
||||
private void setAppInfoRunning(AppInfo am, int value) {
|
||||
am.getResourceRequests().add(new ResourceRequestInfo());
|
||||
am.setAllocatedMB(value);
|
||||
am.setAllocatedVCores(value);
|
||||
am.setReservedMB(value);
|
||||
am.setReservedVCores(value);
|
||||
am.setRunningContainers(value);
|
||||
am.setMemorySeconds(value);
|
||||
am.setVcoreSeconds(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
|
||||
* in case we want to merge 2 UAMs without their own AM. The expected result
|
||||
* would be an empty report or a partial report of the 2 UAMs depending on the
|
||||
* selected policy.
|
||||
*/
|
||||
@Test
|
||||
public void testMerge2UAM() {
|
||||
|
||||
AppsInfo apps = new AppsInfo();
|
||||
|
||||
AppInfo app1 = new AppInfo();
|
||||
app1.setAppId(APPID1.toString());
|
||||
app1.setName(UnmanagedApplicationManager.APP_NAME);
|
||||
app1.setState(YarnApplicationState.RUNNING);
|
||||
apps.add(app1);
|
||||
|
||||
AppInfo app2 = new AppInfo();
|
||||
app2.setAppId(APPID1.toString());
|
||||
app2.setName(UnmanagedApplicationManager.APP_NAME);
|
||||
app2.setState(YarnApplicationState.RUNNING);
|
||||
apps.add(app2);
|
||||
|
||||
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(0, result.getApps().size());
|
||||
|
||||
// By enabling partial result, the expected result would be a partial report
|
||||
// of the 2 UAMs
|
||||
AppsInfo result2 = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), true);
|
||||
Assert.assertNotNull(result2);
|
||||
Assert.assertEquals(1, result2.getApps().size());
|
||||
Assert.assertEquals(YarnApplicationState.RUNNING,
|
||||
result2.getApps().get(0).getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
|
||||
* in case we want to merge 1 UAM that does not depend on Federation. The
|
||||
* excepted result would be the same app report.
|
||||
*/
|
||||
@Test
|
||||
public void testMergeUAM() {
|
||||
|
||||
AppsInfo apps = new AppsInfo();
|
||||
|
||||
AppInfo app1 = new AppInfo();
|
||||
app1.setAppId(APPID1.toString());
|
||||
app1.setName("Test");
|
||||
apps.add(app1);
|
||||
|
||||
// in this case the result does not change if we enable partial result
|
||||
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(1, result.getApps().size());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue