YARN-7010. Federation: routing REST invocations transparently to multiple RMs (part 2 - getApps). (Contributed by Giovanni Matteo Fumarola via curino)

(cherry picked from commit cc8893edc0)
This commit is contained in:
Carlo Curino 2017-08-29 14:53:09 -07:00
parent 2aacb9d3fb
commit 88b32edb8f
13 changed files with 858 additions and 81 deletions

View File

@ -2671,6 +2671,15 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.webapp."
+ "DefaultRequestInterceptorREST";
/**
* The interceptor class used in FederationInterceptorREST should return
* partial AppReports.
*/
public static final String ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
ROUTER_WEBAPP_PREFIX + "partial-result.enabled";
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false;
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -158,6 +158,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

View File

@ -83,7 +83,7 @@ public class UnmanagedApplicationManager {
private static final Logger LOG =
LoggerFactory.getLogger(UnmanagedApplicationManager.class);
private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
private static final String APP_NAME = "UnmanagedAM";
public static final String APP_NAME = "UnmanagedAM";
private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields.DeSel
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@XmlRootElement(name = "app")
@ -71,9 +72,9 @@ public class AppInfo {
// these are ok for any user to see
protected String id;
protected String user;
protected String name;
private String name;
protected String queue;
protected YarnApplicationState state;
private YarnApplicationState state;
protected FinalApplicationStatus finalStatus;
protected float progress;
protected String trackingUI;
@ -91,21 +92,21 @@ public class AppInfo {
protected String amContainerLogs;
protected String amHostHttpAddress;
private String amRPCAddress;
protected long allocatedMB;
protected long allocatedVCores;
protected long reservedMB;
protected long reservedVCores;
protected int runningContainers;
protected long memorySeconds;
protected long vcoreSeconds;
private long allocatedMB;
private long allocatedVCores;
private long reservedMB;
private long reservedVCores;
private int runningContainers;
private long memorySeconds;
private long vcoreSeconds;
protected float queueUsagePercentage;
protected float clusterUsagePercentage;
// preemption info fields
protected long preemptedResourceMB;
protected long preemptedResourceVCores;
protected int numNonAMContainerPreempted;
protected int numAMContainerPreempted;
private long preemptedResourceMB;
private long preemptedResourceVCores;
private int numNonAMContainerPreempted;
private int numAMContainerPreempted;
private long preemptedMemorySeconds;
private long preemptedVcoreSeconds;
@ -141,12 +142,11 @@ public class AppInfo {
|| YarnApplicationState.NEW_SAVING == this.state
|| YarnApplicationState.SUBMITTED == this.state
|| YarnApplicationState.ACCEPTED == this.state;
this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app
.getFinishTime() == 0 ? "ApplicationMaster" : "History");
this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED"
: (app.getFinishTime() == 0 ? "ApplicationMaster" : "History");
if (!trackingUrlIsNotReady) {
this.trackingUrl =
WebAppUtils.getURLWithScheme(schemePrefix,
trackingUrl);
WebAppUtils.getURLWithScheme(schemePrefix, trackingUrl);
this.trackingUrlPretty = this.trackingUrl;
} else {
this.trackingUrlPretty = "UNASSIGNED";
@ -161,15 +161,15 @@ public class AppInfo {
this.priority = 0;
if (app.getApplicationPriority() != null) {
this.priority = app.getApplicationPriority()
.getPriority();
this.priority = app.getApplicationPriority().getPriority();
}
this.progress = app.getProgress() * 100;
this.diagnostics = app.getDiagnostics().toString();
if (diagnostics == null || diagnostics.isEmpty()) {
this.diagnostics = "";
}
if (app.getApplicationTags() != null && !app.getApplicationTags().isEmpty()) {
if (app.getApplicationTags() != null
&& !app.getApplicationTags().isEmpty()) {
this.applicationTags = Joiner.on(',').join(app.getApplicationTags());
}
this.finalStatus = app.getFinalApplicationStatus();
@ -177,8 +177,8 @@ public class AppInfo {
if (hasAccess) {
this.startedTime = app.getStartTime();
this.finishedTime = app.getFinishTime();
this.elapsedTime = Times.elapsed(app.getStartTime(),
app.getFinishTime());
this.elapsedTime =
Times.elapsed(app.getStartTime(), app.getFinishTime());
this.logAggregationStatus = app.getLogAggregationStatusForAppReport();
RMAppAttempt attempt = app.getCurrentAppAttempt();
if (attempt != null) {
@ -193,8 +193,8 @@ public class AppInfo {
this.amRPCAddress = getAmRPCAddressFromRMAppAttempt(attempt);
ApplicationResourceUsageReport resourceReport = attempt
.getApplicationResourceUsageReport();
ApplicationResourceUsageReport resourceReport =
attempt.getApplicationResourceUsageReport();
if (resourceReport != null) {
Resource usedResources = resourceReport.getUsedResources();
Resource reservedResources = resourceReport.getReservedResources();
@ -207,9 +207,10 @@ public class AppInfo {
clusterUsagePercentage = resourceReport.getClusterUsagePercentage();
}
/* When the deSelects parameter contains "resourceRequests",
it skips returning massive ResourceRequest objects and vice versa.
Default behavior is no skipping. (YARN-6280)
/*
* When the deSelects parameter contains "resourceRequests", it skips
* returning massive ResourceRequest objects and vice versa. Default
* behavior is no skipping. (YARN-6280)
*/
if (!deSelects.contains(DeSelectType.RESOURCE_REQUESTS)) {
List<ResourceRequest> resourceRequestsRaw = rm.getRMContext()
@ -226,12 +227,9 @@ public class AppInfo {
// copy preemption info fields
RMAppMetrics appMetrics = app.getRMAppMetrics();
numAMContainerPreempted =
appMetrics.getNumAMContainersPreempted();
preemptedResourceMB =
appMetrics.getResourcePreempted().getMemorySize();
numNonAMContainerPreempted =
appMetrics.getNumNonAMContainersPreempted();
numAMContainerPreempted = appMetrics.getNumAMContainersPreempted();
preemptedResourceMB = appMetrics.getResourcePreempted().getMemorySize();
numNonAMContainerPreempted = appMetrics.getNumNonAMContainersPreempted();
preemptedResourceVCores =
appMetrics.getResourcePreempted().getVirtualCores();
memorySeconds = appMetrics.getMemorySeconds();
@ -240,8 +238,7 @@ public class AppInfo {
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext();
unmanagedApplication =
appSubmissionContext.getUnmanagedAM();
unmanagedApplication = appSubmissionContext.getUnmanagedAM();
appNodeLabelExpression =
app.getApplicationSubmissionContext().getNodeLabelExpression();
amNodeLabelExpression = (unmanagedApplication) ? null
@ -284,6 +281,7 @@ public class AppInfo {
timeouts.add(timeout);
}
}
}
}
@ -415,22 +413,6 @@ public class AppInfo {
return this.reservedVCores;
}
public long getPreemptedMB() {
return preemptedResourceMB;
}
public long getPreemptedVCores() {
return preemptedResourceVCores;
}
public int getNumNonAMContainersPreempted() {
return numNonAMContainerPreempted;
}
public int getNumAMContainersPreempted() {
return numAMContainerPreempted;
}
public long getMemorySeconds() {
return memorySeconds;
}
@ -446,10 +428,15 @@ public class AppInfo {
public long getPreemptedVcoreSeconds() {
return preemptedVcoreSeconds;
}
public List<ResourceRequestInfo> getResourceRequests() {
return this.resourceRequests;
}
public void setResourceRequests(List<ResourceRequestInfo> resourceRequests) {
this.resourceRequests = resourceRequests;
}
public LogAggregationStatus getLogAggregationStatus() {
return this.logAggregationStatus;
}
@ -473,4 +460,89 @@ public class AppInfo {
public ResourcesInfo getResourceInfo() {
return resourceInfo;
}
public long getPreemptedResourceMB() {
return preemptedResourceMB;
}
public void setPreemptedResourceMB(long preemptedResourceMB) {
this.preemptedResourceMB = preemptedResourceMB;
}
public long getPreemptedResourceVCores() {
return preemptedResourceVCores;
}
public void setPreemptedResourceVCores(long preemptedResourceVCores) {
this.preemptedResourceVCores = preemptedResourceVCores;
}
public int getNumNonAMContainerPreempted() {
return numNonAMContainerPreempted;
}
public void setNumNonAMContainerPreempted(int numNonAMContainerPreempted) {
this.numNonAMContainerPreempted = numNonAMContainerPreempted;
}
public int getNumAMContainerPreempted() {
return numAMContainerPreempted;
}
public void setNumAMContainerPreempted(int numAMContainerPreempted) {
this.numAMContainerPreempted = numAMContainerPreempted;
}
public void setPreemptedMemorySeconds(long preemptedMemorySeconds) {
this.preemptedMemorySeconds = preemptedMemorySeconds;
}
public void setPreemptedVcoreSeconds(long preemptedVcoreSeconds) {
this.preemptedVcoreSeconds = preemptedVcoreSeconds;
}
public void setAllocatedMB(long allocatedMB) {
this.allocatedMB = allocatedMB;
}
public void setAllocatedVCores(long allocatedVCores) {
this.allocatedVCores = allocatedVCores;
}
public void setReservedMB(long reservedMB) {
this.reservedMB = reservedMB;
}
public void setReservedVCores(long reservedVCores) {
this.reservedVCores = reservedVCores;
}
public void setRunningContainers(int runningContainers) {
this.runningContainers = runningContainers;
}
public void setMemorySeconds(long memorySeconds) {
this.memorySeconds = memorySeconds;
}
public void setVcoreSeconds(long vcoreSeconds) {
this.vcoreSeconds = vcoreSeconds;
}
public void setAppId(String appId) {
this.id = appId;
}
@VisibleForTesting
public void setAMHostHttpAddress(String amHost) {
this.amHostHttpAddress = amHost;
}
public void setState(YarnApplicationState state) {
this.state = state;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -40,4 +40,8 @@ public class AppsInfo {
return app;
}
public void addAll(ArrayList<AppInfo> appsInfo) {
app.addAll(appsInfo);
}
}

View File

@ -49,6 +49,8 @@ public final class RouterMetrics {
private MutableGaugeInt numAppsFailedKilled;
@Metric("# of application reports failed to be retrieved")
private MutableGaugeInt numAppsFailedRetrieved;
@Metric("# of multiple applications reports failed to be retrieved")
private MutableGaugeInt numMultipleAppsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -59,6 +61,9 @@ public final class RouterMetrics {
private MutableRate totalSucceededAppsCreated;
@Metric("Total number of successful Retrieved app reports and latency(ms)")
private MutableRate totalSucceededAppsRetrieved;
@Metric("Total number of successful Retrieved multiple apps reports and "
+ "latency(ms)")
private MutableRate totalSucceededMultipleAppsRetrieved;
/**
* Provide quantile counters for all latencies.
@ -67,6 +72,7 @@ public final class RouterMetrics {
private MutableQuantiles getNewApplicationLatency;
private MutableQuantiles killApplicationLatency;
private MutableQuantiles getApplicationReportLatency;
private MutableQuantiles getApplicationsReportLatency;
private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
@ -83,6 +89,9 @@ public final class RouterMetrics {
getApplicationReportLatency =
registry.newQuantiles("getApplicationReportLatency",
"latency of get application report", "ops", "latency", 10);
getApplicationsReportLatency =
registry.newQuantiles("getApplicationsReportLatency",
"latency of get applications report", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -124,6 +133,11 @@ public final class RouterMetrics {
return totalSucceededAppsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededMultipleAppsRetrieved() {
return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -144,6 +158,11 @@ public final class RouterMetrics {
return totalSucceededAppsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededMultipleGetAppReport() {
return totalSucceededMultipleAppsRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -164,6 +183,11 @@ public final class RouterMetrics {
return numAppsFailedRetrieved.value();
}
@VisibleForTesting
public int getMultipleAppsFailedRetrieved() {
return numMultipleAppsFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -184,6 +208,11 @@ public final class RouterMetrics {
getApplicationReportLatency.add(duration);
}
public void succeededMultipleAppsRetrieved(long duration) {
totalSucceededMultipleAppsRetrieved.add(duration);
getApplicationsReportLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -200,4 +229,8 @@ public final class RouterMetrics {
numAppsFailedRetrieved.incr();
}
public void incrMultipleAppsFailedRetrieved() {
numMultipleAppsFailedRetrieved.incr();
}
}

View File

@ -25,6 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -102,9 +107,15 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private RouterPolicyFacade policyFacade;
private RouterMetrics routerMetrics;
private final Clock clock = new MonotonicClock();
private boolean returnPartialReport;
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
/**
* Thread pool used for asynchronous operations.
*/
private ExecutorService threadpool;
@Override
public void init(String user) {
federationFacade = FederationStateStoreFacade.getInstance();
@ -125,6 +136,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
routerMetrics = RouterMetrics.getMetrics();
threadpool = Executors.newCachedThreadPool();
returnPartialReport =
conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
}
private SubClusterId getRandomActiveSubCluster(
@ -586,6 +602,99 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
return response;
}
/**
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
* after that it will group all the ApplicationReports by the ApplicationId.
* <p>
* Possible failure:
* <p>
* Client: identical behavior as {@code RMWebServices}.
* <p>
* Router: the Client will timeout and resubmit the request.
* <p>
* ResourceManager: the Router calls each Yarn RM in parallel by using one
* thread for each Yarn RM. In case a Yarn RM fails, a single call will
* timeout. However the Router will merge the ApplicationReports it got, and
* provides a partial list to the client.
* <p>
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, Set<String> unselectedFields) {
AppsInfo apps = new AppsInfo();
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
routerMetrics.incrMultipleAppsFailedRetrieved();
return null;
}
// Send the requests in parallel
ExecutorCompletionService<AppsInfo> compSvc =
new ExecutorCompletionService<AppsInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) {
compSvc.submit(new Callable<AppsInfo>() {
@Override
public AppsInfo call() {
DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
info.getClientRMServiceAddress());
AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery,
finalStatusQuery, userQuery, queueQuery, count, startedBegin,
startedEnd, finishBegin, finishEnd, applicationTypes,
applicationTags, unselectedFields);
if (rmApps == null) {
routerMetrics.incrMultipleAppsFailedRetrieved();
LOG.error("Subcluster " + info.getSubClusterId()
+ " failed to return appReport.");
return null;
}
return rmApps;
}
});
}
// Collect all the responses in parallel
for (int i = 0; i < subClustersActive.values().size(); i++) {
try {
Future<AppsInfo> future = compSvc.take();
AppsInfo appsResponse = future.get();
long stopTime = clock.getTime();
routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
if (appsResponse != null) {
apps.addAll(appsResponse.getApps());
}
} catch (Throwable e) {
routerMetrics.incrMultipleAppsFailedRetrieved();
LOG.warn("Failed to get application report ", e);
}
}
if (apps.getApps().isEmpty()) {
return null;
}
// Merge all the application reports got from all the available Yarn RMs
return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(),
returnPartialReport);
}
@Override
public ClusterInfo get() {
return getClusterInfo();
@ -639,15 +748,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new NotImplementedException();
}
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, Set<String> unselectedFields) {
throw new NotImplementedException();
}
@Override
public AppState getAppState(HttpServletRequest hsr, String appId)
throws AuthorizationException {

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -33,7 +35,11 @@ import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
@ -55,6 +61,8 @@ public final class RouterWebServiceUtil {
private static final Log LOG =
LogFactory.getLog(RouterWebServiceUtil.class.getName());
private final static String PARTIAL_REPORT = "Partial Report ";
/** Disable constructor. */
private RouterWebServiceUtil() {
}
@ -225,4 +233,103 @@ public final class RouterWebServiceUtil {
}
/**
* Merges a list of AppInfo grouping by ApplicationId. Our current policy
* is to merge the application reports from the reacheable SubClusters.
* Via configuration parameter, we decide whether to return applications
* for which the primary AM is missing or to omit them.
*
* @param appsInfo a list of AppInfo to merge
* @param returnPartialResult if the merge AppsInfo should contain partial
* result or not
* @return the merged AppsInfo
*/
public static AppsInfo mergeAppsInfo(ArrayList<AppInfo> appsInfo,
boolean returnPartialResult) {
AppsInfo allApps = new AppsInfo();
Map<String, AppInfo> federationAM = new HashMap<String, AppInfo>();
Map<String, AppInfo> federationUAMSum = new HashMap<String, AppInfo>();
for (AppInfo a : appsInfo) {
// Check if this AppInfo is an AM
if (a.getAMHostHttpAddress() != null) {
// Insert in the list of AM
federationAM.put(a.getAppId(), a);
// Check if there are any UAM found before
if (federationUAMSum.containsKey(a.getAppId())) {
// Merge the current AM with the found UAM
mergeAMWithUAM(a, federationUAMSum.get(a.getAppId()));
// Remove the sum of the UAMs
federationUAMSum.remove(a.getAppId());
}
// This AppInfo is an UAM
} else {
if (federationAM.containsKey(a.getAppId())) {
// Merge the current UAM with its own AM
mergeAMWithUAM(federationAM.get(a.getAppId()), a);
} else if (federationUAMSum.containsKey(a.getAppId())) {
// Merge the current UAM with its own UAM and update the list of UAM
federationUAMSum.put(a.getAppId(),
mergeUAMWithUAM(federationUAMSum.get(a.getAppId()), a));
} else {
// Insert in the list of UAM
federationUAMSum.put(a.getAppId(), a);
}
}
}
// Check the remaining UAMs are depending or not from federation
for (AppInfo a : federationUAMSum.values()) {
if (returnPartialResult || (a.getName() != null
&& !(a.getName().startsWith(UnmanagedApplicationManager.APP_NAME)
|| a.getName().startsWith(PARTIAL_REPORT)))) {
federationAM.put(a.getAppId(), a);
}
}
allApps.addAll(new ArrayList<AppInfo>(federationAM.values()));
return allApps;
}
private static AppInfo mergeUAMWithUAM(AppInfo uam1, AppInfo uam2) {
AppInfo partialReport = new AppInfo();
partialReport.setAppId(uam1.getAppId());
partialReport.setName(PARTIAL_REPORT + uam1.getAppId());
// We pick the status of the first uam
partialReport.setState(uam1.getState());
// Merge the newly partial AM with UAM1 and then with UAM2
mergeAMWithUAM(partialReport, uam1);
mergeAMWithUAM(partialReport, uam2);
return partialReport;
}
private static void mergeAMWithUAM(AppInfo am, AppInfo uam) {
am.setPreemptedResourceMB(
am.getPreemptedResourceMB() + uam.getPreemptedResourceMB());
am.setPreemptedResourceVCores(
am.getPreemptedResourceVCores() + uam.getPreemptedResourceVCores());
am.setNumNonAMContainerPreempted(am.getNumNonAMContainerPreempted()
+ uam.getNumNonAMContainerPreempted());
am.setNumAMContainerPreempted(
am.getNumAMContainerPreempted() + uam.getNumAMContainerPreempted());
am.setPreemptedMemorySeconds(
am.getPreemptedMemorySeconds() + uam.getPreemptedMemorySeconds());
am.setPreemptedVcoreSeconds(
am.getPreemptedVcoreSeconds() + uam.getPreemptedVcoreSeconds());
if (am.getState() == YarnApplicationState.RUNNING
&& uam.getState() == am.getState()) {
am.getResourceRequests().addAll(uam.getResourceRequests());
am.setAllocatedMB(am.getAllocatedMB() + uam.getAllocatedMB());
am.setAllocatedVCores(am.getAllocatedVCores() + uam.getAllocatedVCores());
am.setReservedMB(am.getReservedMB() + uam.getReservedMB());
am.setReservedVCores(am.getReservedVCores() + uam.getReservedMB());
am.setRunningContainers(
am.getRunningContainers() + uam.getRunningContainers());
am.setMemorySeconds(am.getMemorySeconds() + uam.getMemorySeconds());
am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
}
}
}

View File

@ -196,6 +196,45 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
}
/**
* This test validates the correctness of the metric: Retrieved Multiple Apps
* successfully.
*/
@Test
public void testSucceededMultipleAppsReport() {
long totalGoodBefore = metrics.getNumSucceededMultipleAppsRetrieved();
goodSubCluster.getApplicationsReport(100);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededMultipleAppsRetrieved());
Assert.assertEquals(100, metrics.getLatencySucceededMultipleGetAppReport(),
0);
goodSubCluster.getApplicationsReport(200);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededMultipleAppsRetrieved());
Assert.assertEquals(150, metrics.getLatencySucceededMultipleGetAppReport(),
0);
}
/**
* This test validates the correctness of the metric: Failed to retrieve
* Multiple Apps.
*/
@Test
public void testMulipleAppsReportFailed() {
long totalBadbefore = metrics.getMultipleAppsFailedRetrieved();
badSubCluster.getApplicationsReport();
Assert.assertEquals(totalBadbefore + 1,
metrics.getMultipleAppsFailedRetrieved());
}
// Records failures for all calls
private class MockBadSubCluster {
public void getNewApplication() {
@ -217,6 +256,11 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getApplicationReport call");
metrics.incrAppsFailedRetrieved();
}
public void getApplicationsReport() {
LOG.info("Mocked: failed getApplicationsReport call");
metrics.incrMultipleAppsFailedRetrieved();
}
}
// Records successes for all calls
@ -244,5 +288,11 @@ public class TestRouterMetrics {
duration);
metrics.succeededAppsRetrieved(duration);
}
public void getApplicationsReport(long duration) {
LOG.info("Mocked: successful getApplicationsReport call with duration {}",
duration);
metrics.succeededMultipleAppsRetrieved(duration);
}
}
}

View File

@ -18,26 +18,32 @@
package org.apache.hadoop.yarn.server.router.webapp;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.ConnectException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class mocks the RESTRequestInterceptor.
*/
@ -100,6 +106,27 @@ public class MockDefaultRequestInterceptorREST
return new AppInfo();
}
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, Set<String> unselectedFields) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
AppsInfo appsInfo = new AppsInfo();
AppInfo appInfo = new AppInfo();
appInfo.setAppId(
ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
applicationCounter.incrementAndGet()).toString());
appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");
appsInfo.add(appInfo);
return appsInfo;
}
@Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUt
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.junit.Assert;
import org.junit.Test;
@ -374,4 +375,20 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
Assert.assertNull(response);
}
/**
* This test validates the correctness of GetApplicationsReport in case each
* subcluster provided one application.
*/
@Test
public void testGetApplicationsReport()
throws YarnException, IOException, InterruptedException {
AppsInfo responseGet = interceptor.getApps(null, null, null, null, null,
null, null, null, null, null, null, null, null, null);
Assert.assertNotNull(responseGet);
Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
// The merged operations will be tested in TestRouterWebServiceUtil
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
@ -271,4 +272,48 @@ public class TestFederationInterceptorRESTRetry
.getApplicationHomeSubCluster().getHomeSubCluster());
}
/**
* This test validates the correctness of GetApps in case the cluster is
* composed of only 1 bad SubCluster.
*/
@Test
public void testGetAppsOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
Assert.assertNull(response);
}
/**
* This test validates the correctness of GetApps in case the cluster is
* composed of only 2 bad SubClusters.
*/
@Test
public void testGetAppsTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
Assert.assertNull(response);
}
/**
* This test validates the correctness of GetApps in case the cluster is
* composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testGetAppsOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getApps().size());
}
}

View File

@ -0,0 +1,311 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert;
import org.junit.Test;
/**
* Test class to validate RouterWebServiceUtil methods.
*/
public class TestRouterWebServiceUtil {
private static final ApplicationId APPID1 = ApplicationId.newInstance(1, 1);
private static final ApplicationId APPID2 = ApplicationId.newInstance(2, 1);
private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
/**
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
* in case we want to merge 4 AMs. The expected result would be the same 4
* AMs.
*/
@Test
public void testMerge4DifferentApps() {
AppsInfo apps = new AppsInfo();
int value = 1000;
AppInfo app1 = new AppInfo();
app1.setAppId(APPID1.toString());
app1.setAMHostHttpAddress("http://i_am_the_AM1:1234");
app1.setState(YarnApplicationState.FINISHED);
app1.setNumAMContainerPreempted(value);
apps.add(app1);
AppInfo app2 = new AppInfo();
app2.setAppId(APPID2.toString());
app2.setAMHostHttpAddress("http://i_am_the_AM2:1234");
app2.setState(YarnApplicationState.ACCEPTED);
app2.setAllocatedVCores(2 * value);
apps.add(app2);
AppInfo app3 = new AppInfo();
app3.setAppId(APPID3.toString());
app3.setAMHostHttpAddress("http://i_am_the_AM3:1234");
app3.setState(YarnApplicationState.RUNNING);
app3.setReservedMB(3 * value);
apps.add(app3);
AppInfo app4 = new AppInfo();
app4.setAppId(APPID4.toString());
app4.setAMHostHttpAddress("http://i_am_the_AM4:1234");
app4.setState(YarnApplicationState.NEW);
app4.setAllocatedMB(4 * value);
apps.add(app4);
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
Assert.assertNotNull(result);
Assert.assertEquals(4, result.getApps().size());
List<String> appIds = new ArrayList<String>();
AppInfo appInfo1 = null, appInfo2 = null, appInfo3 = null, appInfo4 = null;
for (AppInfo app : result.getApps()) {
appIds.add(app.getAppId());
if (app.getAppId().equals(APPID1.toString())) {
appInfo1 = app;
}
if (app.getAppId().equals(APPID2.toString())) {
appInfo2 = app;
}
if (app.getAppId().equals(APPID3.toString())) {
appInfo3 = app;
}
if (app.getAppId().equals(APPID4.toString())) {
appInfo4 = app;
}
}
Assert.assertTrue(appIds.contains(APPID1.toString()));
Assert.assertTrue(appIds.contains(APPID2.toString()));
Assert.assertTrue(appIds.contains(APPID3.toString()));
Assert.assertTrue(appIds.contains(APPID4.toString()));
// Check preservations APP1
Assert.assertEquals(app1.getState(), appInfo1.getState());
Assert.assertEquals(app1.getNumAMContainerPreempted(),
appInfo1.getNumAMContainerPreempted());
// Check preservations APP2
Assert.assertEquals(app2.getState(), appInfo2.getState());
Assert.assertEquals(app3.getAllocatedVCores(),
appInfo3.getAllocatedVCores());
// Check preservations APP3
Assert.assertEquals(app3.getState(), appInfo3.getState());
Assert.assertEquals(app3.getReservedMB(), appInfo3.getReservedMB());
// Check preservations APP3
Assert.assertEquals(app4.getState(), appInfo4.getState());
Assert.assertEquals(app3.getAllocatedMB(), appInfo3.getAllocatedMB());
}
/**
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
* in case we want to merge 2 UAMs and their own AM. The status of the AM is
* FINISHED, so we check the correctness of the merging of the historical
* values. The expected result would be 1 report with the merged information.
*/
@Test
public void testMergeAppsFinished() {
AppsInfo apps = new AppsInfo();
String amHost = "http://i_am_the_AM1:1234";
AppInfo am = new AppInfo();
am.setAppId(APPID1.toString());
am.setAMHostHttpAddress(amHost);
am.setState(YarnApplicationState.FINISHED);
int value = 1000;
setAppInfoFinished(am, value);
apps.add(am);
AppInfo uam1 = new AppInfo();
uam1.setAppId(APPID1.toString());
apps.add(uam1);
setAppInfoFinished(uam1, value);
AppInfo uam2 = new AppInfo();
uam2.setAppId(APPID1.toString());
apps.add(uam2);
setAppInfoFinished(uam2, value);
// in this case the result does not change if we enable partial result
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getApps().size());
AppInfo app = result.getApps().get(0);
Assert.assertEquals(APPID1.toString(), app.getAppId());
Assert.assertEquals(amHost, app.getAMHostHttpAddress());
Assert.assertEquals(value * 3, app.getPreemptedResourceMB());
Assert.assertEquals(value * 3, app.getPreemptedResourceVCores());
Assert.assertEquals(value * 3, app.getNumNonAMContainerPreempted());
Assert.assertEquals(value * 3, app.getNumAMContainerPreempted());
Assert.assertEquals(value * 3, app.getPreemptedMemorySeconds());
Assert.assertEquals(value * 3, app.getPreemptedVcoreSeconds());
}
private void setAppInfoFinished(AppInfo am, int value) {
am.setPreemptedResourceMB(value);
am.setPreemptedResourceVCores(value);
am.setNumNonAMContainerPreempted(value);
am.setNumAMContainerPreempted(value);
am.setPreemptedMemorySeconds(value);
am.setPreemptedVcoreSeconds(value);
}
/**
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
* in case we want to merge 2 UAMs and their own AM. The status of the AM is
* RUNNING, so we check the correctness of the merging of the runtime values.
* The expected result would be 1 report with the merged information.
*/
@Test
public void testMergeAppsRunning() {
AppsInfo apps = new AppsInfo();
String amHost = "http://i_am_the_AM2:1234";
AppInfo am = new AppInfo();
am.setAppId(APPID2.toString());
am.setAMHostHttpAddress(amHost);
am.setState(YarnApplicationState.RUNNING);
int value = 1000;
setAppInfoRunning(am, value);
apps.add(am);
AppInfo uam1 = new AppInfo();
uam1.setAppId(APPID2.toString());
uam1.setState(YarnApplicationState.RUNNING);
apps.add(uam1);
setAppInfoRunning(uam1, value);
AppInfo uam2 = new AppInfo();
uam2.setAppId(APPID2.toString());
uam2.setState(YarnApplicationState.RUNNING);
apps.add(uam2);
setAppInfoRunning(uam2, value);
// in this case the result does not change if we enable partial result
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getApps().size());
AppInfo app = result.getApps().get(0);
Assert.assertEquals(APPID2.toString(), app.getAppId());
Assert.assertEquals(amHost, app.getAMHostHttpAddress());
Assert.assertEquals(value * 3, app.getAllocatedMB());
Assert.assertEquals(value * 3, app.getAllocatedVCores());
Assert.assertEquals(value * 3, app.getReservedMB());
Assert.assertEquals(value * 3, app.getReservedVCores());
Assert.assertEquals(value * 3, app.getRunningContainers());
Assert.assertEquals(value * 3, app.getMemorySeconds());
Assert.assertEquals(value * 3, app.getVcoreSeconds());
Assert.assertEquals(3, app.getResourceRequests().size());
}
private void setAppInfoRunning(AppInfo am, int value) {
am.getResourceRequests().add(new ResourceRequestInfo());
am.setAllocatedMB(value);
am.setAllocatedVCores(value);
am.setReservedMB(value);
am.setReservedVCores(value);
am.setRunningContainers(value);
am.setMemorySeconds(value);
am.setVcoreSeconds(value);
}
/**
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
* in case we want to merge 2 UAMs without their own AM. The expected result
* would be an empty report or a partial report of the 2 UAMs depending on the
* selected policy.
*/
@Test
public void testMerge2UAM() {
AppsInfo apps = new AppsInfo();
AppInfo app1 = new AppInfo();
app1.setAppId(APPID1.toString());
app1.setName(UnmanagedApplicationManager.APP_NAME);
app1.setState(YarnApplicationState.RUNNING);
apps.add(app1);
AppInfo app2 = new AppInfo();
app2.setAppId(APPID1.toString());
app2.setName(UnmanagedApplicationManager.APP_NAME);
app2.setState(YarnApplicationState.RUNNING);
apps.add(app2);
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
Assert.assertNotNull(result);
Assert.assertEquals(0, result.getApps().size());
// By enabling partial result, the expected result would be a partial report
// of the 2 UAMs
AppsInfo result2 = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), true);
Assert.assertNotNull(result2);
Assert.assertEquals(1, result2.getApps().size());
Assert.assertEquals(YarnApplicationState.RUNNING,
result2.getApps().get(0).getState());
}
/**
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
* in case we want to merge 1 UAM that does not depend on Federation. The
* excepted result would be the same app report.
*/
@Test
public void testMergeUAM() {
AppsInfo apps = new AppsInfo();
AppInfo app1 = new AppInfo();
app1.setAppId(APPID1.toString());
app1.setName("Test");
apps.add(app1);
// in this case the result does not change if we enable partial result
AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getApps().size());
}
}