YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130)

This commit is contained in:
slfan1989 2022-11-23 06:38:24 +08:00 committed by GitHub
parent 3c37a01654
commit 7cb22eb72d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 339 additions and 323 deletions

View File

@ -22,8 +22,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Random;
import javax.cache.Cache;
import javax.cache.CacheManager;
@ -38,6 +40,8 @@ import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.spi.CachingProvider;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
@ -50,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
@ -110,6 +116,8 @@ public final class FederationStateStoreFacade {
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
private static Random rand = new Random(System.currentTimeMillis());
private FederationStateStore stateStore;
private int cacheTimeToLive;
private Configuration conf;
@ -496,6 +504,7 @@ public final class FederationStateStoreFacade {
* @param defaultValue the default implementation for fallback
* @param type the class for which a retry proxy is required
* @param retryPolicy the policy for retrying method call failures
* @param <T> The type of the instance
* @return a retry proxy for the specified interface
*/
public static <T> Object createRetryInstance(Configuration conf,
@ -731,7 +740,7 @@ public final class FederationStateStoreFacade {
return stateStore;
}
/*
/**
* The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
*
* @param newKey Key used for generating and verifying delegation tokens
@ -849,4 +858,163 @@ public final class FederationStateStoreFacade {
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
return stateStore.getTokenByRouterStoreToken(request);
}
/**
* Get the number of active cluster nodes.
*
* @return number of active cluster nodes.
* @throws YarnException if the call to the state store is unsuccessful.
*/
public int getActiveSubClustersCount() throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubClusters = getSubClusters(true);
if (activeSubClusters == null || activeSubClusters.isEmpty()) {
return 0;
} else {
return activeSubClusters.size();
}
}
/**
* Randomly pick ActiveSubCluster.
* During the selection process, we will exclude SubClusters from the blacklist.
*
* @param activeSubClusters List of active subClusters.
* @param blackList blacklist.
* @return Active SubClusterId.
* @throws YarnException When there is no Active SubCluster,
* an exception will be thrown (No active SubCluster available to submit the request.)
*/
public static SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
throws YarnException {
// Check if activeSubClusters is empty, if it is empty, we need to throw an exception
if (MapUtils.isEmpty(activeSubClusters)) {
throw new FederationPolicyException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}
// Change activeSubClusters to List
List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
// If the blacklist is not empty, we need to remove all the subClusters in the blacklist
if (CollectionUtils.isNotEmpty(blackList)) {
subClusterIds.removeAll(blackList);
}
// Check there are still active subcluster after removing the blacklist
if (CollectionUtils.isEmpty(subClusterIds)) {
throw new FederationPolicyException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}
// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}
/**
* Get the number of retries.
*
* @param configRetries User-configured number of retries.
* @return number of retries.
* @throws YarnException yarn exception.
*/
public int getRetryNumbers(int configRetries) throws YarnException {
int activeSubClustersCount = getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, configRetries);
// Normally, we don't set a negative number for the number of retries,
// but if the user sets a negative number for the number of retries,
// we will return 0
if (actualRetryNums < 0) {
return 0;
}
return actualRetryNums;
}
/**
* Query SubClusterId By applicationId.
*
* If SubClusterId is not empty, it means it exists and returns true;
* if SubClusterId is empty, it means it does not exist and returns false.
*
* @param applicationId applicationId
* @return true, SubClusterId exists; false, SubClusterId not exists.
*/
public boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
try {
SubClusterId subClusterId = getApplicationHomeSubCluster(applicationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
}
return false;
}
/**
* Add ApplicationHomeSubCluster to FederationStateStore.
*
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
public void addApplicationHomeSubCluster(ApplicationId applicationId,
ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
addApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
String msg = String.format(
"Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
throw new YarnException(msg, e);
}
}
/**
* Update ApplicationHomeSubCluster to FederationStateStore.
*
* @param subClusterId homeSubClusterId
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
public void updateApplicationHomeSubCluster(SubClusterId subClusterId,
ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
updateApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore = getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId);
} else {
String msg = String.format(
"Unable to update the ApplicationId %s into the FederationStateStore.", applicationId);
throw new YarnException(msg, e);
}
}
}
/**
* Add or Update ApplicationHomeSubCluster.
*
* @param applicationId applicationId, is the id of the application.
* @param subClusterId homeSubClusterId, this is selected by strategy.
* @param retryCount number of retries.
* @throws YarnException yarn exception.
*/
public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
SubClusterId subClusterId, int retryCount) throws YarnException {
Boolean exists = existsApplicationHomeSubCluster(applicationId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
if (!exists || retryCount == 0) {
// persist the mapping of applicationId and the subClusterId which has
// been selected as its home.
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
} else {
// update the mapping of applicationId and the home subClusterId to
// the new subClusterId we have selected.
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
}
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.router;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -30,9 +28,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +36,6 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.IOException;
/**
@ -61,8 +54,6 @@ public final class RouterServerUtil {
private static final String EPOCH_PREFIX = "e";
private static Random rand = new Random(System.currentTimeMillis());
/** Disable constructor. */
private RouterServerUtil() {
}
@ -479,42 +470,6 @@ public final class RouterServerUtil {
}
}
/**
* Randomly pick ActiveSubCluster.
* During the selection process, we will exclude SubClusters from the blacklist.
*
* @param activeSubClusters List of active subClusters.
* @param blackList blacklist.
* @return Active SubClusterId.
* @throws YarnException When there is no Active SubCluster,
* an exception will be thrown (No active SubCluster available to submit the request.)
*/
public static SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
throws YarnException {
// Check if activeSubClusters is empty, if it is empty, we need to throw an exception
if (MapUtils.isEmpty(activeSubClusters)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
// Change activeSubClusters to List
List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
// If the blacklist is not empty, we need to remove all the subClusters in the blacklist
if (CollectionUtils.isNotEmpty(blackList)) {
subClusterIds.removeAll(blackList);
}
// Check there are still active subcluster after removing the blacklist
if (CollectionUtils.isEmpty(subClusterIds)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}
public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {

View File

@ -272,17 +272,6 @@ public class FederationClientInterceptor
return list.get(rand.nextInt(list.size()));
}
@VisibleForTesting
private int getActiveSubClustersCount() throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubClusters =
federationFacade.getSubClusters(true);
if (activeSubClusters == null || activeSubClusters.isEmpty()) {
return 0;
} else {
return activeSubClusters.size();
}
}
/**
* YARN Router forwards every getNewApplication requests to any RM. During
* this operation there will be no communication with the State Store. The
@ -318,7 +307,7 @@ public class FederationClientInterceptor
// Try calling the getNewApplication method
List<SubClusterId> blacklist = new ArrayList<>();
int activeSubClustersCount = getActiveSubClustersCount();
int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
try {
@ -361,7 +350,7 @@ public class FederationClientInterceptor
List<SubClusterId> blackList, GetNewApplicationRequest request, int retryCount)
throws YarnException, IOException {
SubClusterId subClusterId =
RouterServerUtil.getRandomActiveSubCluster(subClustersActive, blackList);
federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
try {
@ -474,7 +463,7 @@ public class FederationClientInterceptor
// the user will provide us with an expected submitRetries,
// but if the number of Active SubClusters is less than this number at this time,
// we should provide a high number of retry according to the number of Active SubClusters.
int activeSubClustersCount = getActiveSubClustersCount();
int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
// Try calling the SubmitApplication method
@ -542,17 +531,10 @@ public class FederationClientInterceptor
LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
applicationId, retryCount, subClusterId);
// Step2. Query homeSubCluster according to ApplicationId.
Boolean exists = existsApplicationHomeSubCluster(applicationId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
if (!exists || retryCount == 0) {
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
} else {
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
}
// Step2. We Store the mapping relationship
// between Application and HomeSubCluster in stateStore.
federationFacade.addOrUpdateApplicationHomeSubCluster(
applicationId, subClusterId, retryCount);
// Step3. SubmitApplication to the subCluster
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
@ -581,70 +563,6 @@ public class FederationClientInterceptor
throw new YarnException(msg);
}
/**
* Add ApplicationHomeSubCluster to FederationStateStore.
*
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
private void addApplicationHomeSubCluster(ApplicationId applicationId,
ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
federationFacade.addApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
RouterServerUtil.logAndThrowException(e,
"Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
}
}
/**
* Update ApplicationHomeSubCluster to FederationStateStore.
*
* @param subClusterId homeSubClusterId
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
private void updateApplicationHomeSubCluster(SubClusterId subClusterId,
ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
federationFacade.updateApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.",
applicationId, subClusterId);
} else {
RouterServerUtil.logAndThrowException(e,
"Unable to update the ApplicationId %s into the FederationStateStore.",
applicationId);
}
}
}
/**
* Query SubClusterId By applicationId.
*
* If SubClusterId is not empty, it means it exists and returns true;
* if SubClusterId is empty, it means it does not exist and returns false.
*
* @param applicationId applicationId
* @return true, SubClusterId exists; false, SubClusterId not exists.
*/
private boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
try {
SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
}
return false;
}
/**
* The YARN Router will forward to the respective YARN RM in which the AM is
* running.

View File

@ -27,14 +27,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
@ -62,9 +61,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -134,13 +134,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private int numSubmitRetries;
private FederationStateStoreFacade federationFacade;
private Random rand;
private RouterPolicyFacade policyFacade;
private RouterMetrics routerMetrics;
private final Clock clock = new MonotonicClock();
private boolean returnPartialReport;
private boolean appInfosCacheEnabled;
private int appInfosCacheCount;
private long submitIntervalTime;
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
private LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appInfosCaches;
@ -156,7 +156,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
super.init(user);
federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random();
final Configuration conf = this.getConf();
@ -194,24 +193,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
YarnConfiguration.DEFAULT_ROUTER_APPSINFO_CACHED_COUNT);
appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
}
}
private SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubclusters,
List<SubClusterId> blackListSubClusters) throws YarnException {
if (activeSubclusters == null || activeSubclusters.size() < 1) {
RouterServerUtil.logAndThrowException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
Collection<SubClusterId> keySet = activeSubclusters.keySet();
FederationPolicyUtils.validateSubClusterAvailability(keySet, blackListSubClusters);
if (blackListSubClusters != null) {
keySet.removeAll(blackListSubClusters);
}
List<SubClusterId> list = keySet.stream().collect(Collectors.toList());
return list.get(rand.nextInt(list.size()));
submitIntervalTime = conf.getTimeDuration(
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME,
YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS);
}
@VisibleForTesting
@ -301,62 +286,79 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
routerMetrics.incrAppsFailedCreated();
return Response.status(Status.INTERNAL_SERVER_ERROR)
.entity(e.getLocalizedMessage()).build();
}
Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true);
List<SubClusterId> blacklist = new ArrayList<>();
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId;
try {
subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
} catch (YarnException e) {
routerMetrics.incrAppsFailedCreated();
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage()).build();
}
LOG.debug("getNewApplication try #{} on SubCluster {}.", i, subClusterId);
DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(subClusterId,
subClustersActive.get(subClusterId).getRMWebServiceAddress());
Response response = null;
try {
response = interceptor.createNewApplication(hsr);
} catch (Exception e) {
LOG.warn("Unable to create a new ApplicationId in SubCluster {}.",
subClusterId.getId(), e);
}
if (response != null &&
response.getStatus() == HttpServletResponse.SC_OK) {
// We declare blackList and retries.
List<SubClusterId> blackList = new ArrayList<>();
int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
Response response = ((FederationActionRetry<Response>) (retryCount) ->
invokeGetNewApplication(subClustersActive, blackList, hsr, retryCount)).
runWithRetries(actualRetryNums, submitIntervalTime);
// If the response is not empty and the status is SC_OK,
// this request can be returned directly.
if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
long stopTime = clock.getTime();
routerMetrics.succeededAppsCreated(stopTime - startTime);
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
blacklist.add(subClusterId);
}
} catch (FederationPolicyException e) {
// If a FederationPolicyException is thrown, the service is unavailable.
routerMetrics.incrAppsFailedCreated();
return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
} catch (Exception e) {
routerMetrics.incrAppsFailedCreated();
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
}
// return error message directly.
String errMsg = "Fail to create a new application.";
LOG.error(errMsg);
routerMetrics.incrAppsFailedCreated();
return Response
.status(Status.INTERNAL_SERVER_ERROR)
.entity(errMsg)
.build();
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
/**
* Invoke GetNewApplication to different subClusters.
*
* @param subClustersActive Active SubClusters.
* @param blackList Blacklist avoid repeated calls to unavailable subCluster.
* @param hsr HttpServletRequest.
* @param retryCount number of retries.
* @return Get response, If the response is empty or status not equal SC_OK, the request fails,
* if the response is not empty and status equal SC_OK, the request is successful.
* @throws YarnException yarn exception.
* @throws IOException io error.
* @throws InterruptedException interrupted exception.
*/
private Response invokeGetNewApplication(Map<SubClusterId, SubClusterInfo> subClustersActive,
List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
throws YarnException, IOException, InterruptedException {
SubClusterId subClusterId =
federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId,
subClustersActive.get(subClusterId).getRMWebServiceAddress());
try {
Response response = interceptor.createNewApplication(hsr);
if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
return response;
}
} catch (Exception e) {
blackList.add(subClusterId);
RouterServerUtil.logAndThrowException(e.getMessage(), e);
}
// We need to throw the exception directly.
String msg = String.format("Unable to create a new ApplicationId in SubCluster %s.",
subClusterId.getId());
throw new YarnException(msg);
}
/**
@ -431,142 +433,106 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
long startTime = clock.getTime();
// We verify the parameters to ensure that newApp is not empty and
// that the format of applicationId is correct.
if (newApp == null || newApp.getApplicationId() == null) {
routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Missing ApplicationSubmissionContextInfo or "
+ "applicationSubmissionContext information.";
return Response
.status(Status.BAD_REQUEST)
.entity(errMsg)
.build();
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(newApp.getApplicationId());
String applicationId = newApp.getApplicationId();
RouterServerUtil.validateApplicationId(applicationId);
} catch (IllegalArgumentException e) {
routerMetrics.incrAppsFailedSubmitted();
return Response
.status(Status.BAD_REQUEST)
.entity(e.getLocalizedMessage())
.build();
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()).build();
}
List<SubClusterId> blacklist = new ArrayList<>();
for (int i = 0; i < numSubmitRetries; ++i) {
ApplicationSubmissionContext context =
RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf());
SubClusterId subClusterId = null;
try {
subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
} catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted();
return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage())
.build();
}
LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
applicationId, i, subClusterId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
if (i == 0) {
try {
// persist the mapping of applicationId and the subClusterId which has
// been selected as its home
subClusterId =
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Unable to insert the ApplicationId " + applicationId
+ " into the FederationStateStore";
return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(errMsg + " " + e.getLocalizedMessage())
.build();
}
} else {
try {
// update the mapping of applicationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
String errMsg = "Unable to update the ApplicationId " + applicationId
+ " into the FederationStateStore";
SubClusterId subClusterIdInStateStore;
try {
subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
} catch (YarnException e1) {
routerMetrics.incrAppsFailedSubmitted();
return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(e1.getLocalizedMessage())
.build();
}
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.",
applicationId, subClusterId);
} else {
routerMetrics.incrAppsFailedSubmitted();
return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(errMsg)
.build();
}
}
}
SubClusterInfo subClusterInfo;
try {
subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted();
return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage())
.build();
}
Response response = null;
try {
response = getOrCreateInterceptorForSubCluster(subClusterId,
subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
hsr);
} catch (Exception e) {
LOG.warn("Unable to submit the application {} to SubCluster {}",
applicationId, subClusterId.getId(), e);
}
if (response != null &&
response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
LOG.info("Application {} with appId {} submitted on {}",
context.getApplicationName(), applicationId, subClusterId);
List<SubClusterId> blackList = new ArrayList<>();
try {
int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
Response response = ((FederationActionRetry<Response>) (retryCount) ->
invokeSubmitApplication(newApp, blackList, hsr, retryCount)).
runWithRetries(actualRetryNums, submitIntervalTime);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededAppsSubmitted(stopTime - startTime);
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
blacklist.add(subClusterId);
}
} catch (Exception e) {
routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
}
routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Application " + newApp.getApplicationName()
+ " with appId " + applicationId + " failed to be submitted.";
String errMsg = String.format("Application %s with appId %s failed to be submitted.",
newApp.getApplicationName(), newApp.getApplicationId());
LOG.error(errMsg);
return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(errMsg)
.build();
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
}
/**
* Invoke SubmitApplication to different subClusters.
*
* @param submissionContext application submission context.
* @param blackList Blacklist avoid repeated calls to unavailable subCluster.
* @param hsr HttpServletRequest.
* @param retryCount number of retries.
* @return Get response, If the response is empty or status not equal SC_ACCEPTED,
* the request fails, if the response is not empty and status equal SC_OK,
* the request is successful.
* @throws YarnException yarn exception.
* @throws IOException io error.
*/
private Response invokeSubmitApplication(ApplicationSubmissionContextInfo submissionContext,
List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
throws YarnException, IOException, InterruptedException {
// Step1. We convert ApplicationSubmissionContextInfo to ApplicationSubmissionContext
// and Prepare parameters.
ApplicationSubmissionContext context =
RMWebAppUtil.createAppSubmissionContext(submissionContext, this.getConf());
ApplicationId applicationId = ApplicationId.fromString(submissionContext.getApplicationId());
SubClusterId subClusterId = null;
try {
// Get subClusterId from policy.
subClusterId = policyFacade.getHomeSubcluster(context, blackList);
// Print the log of submitting the submitApplication.
LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
applicationId, retryCount, subClusterId);
// Step2. We Store the mapping relationship
// between Application and HomeSubCluster in stateStore.
federationFacade.addOrUpdateApplicationHomeSubCluster(
applicationId, subClusterId, retryCount);
// Step3. We get subClusterInfo based on subClusterId.
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
// Step4. Submit the request, if the response is HttpServletResponse.SC_ACCEPTED,
// We return the response, otherwise we throw an exception.
Response response = getOrCreateInterceptorForSubCluster(subClusterId,
subClusterInfo.getRMWebServiceAddress()).submitApplication(submissionContext, hsr);
if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
LOG.info("Application {} with appId {} submitted on {}.",
context.getApplicationName(), applicationId, subClusterId);
return response;
}
String msg = String.format("application %s failed to be submitted.", applicationId);
throw new YarnException(msg);
} catch (Exception e) {
LOG.warn("Unable to submit the application {} to SubCluster {}.", applicationId,
subClusterId, e);
if (subClusterId != null) {
blackList.add(subClusterId);
}
throw e;
}
}
/**

View File

@ -180,6 +180,9 @@ public class TestFederationInterceptorRESTRetry
@Test
public void testGetNewApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
LOG.info("Test getNewApplication with two bad SCs.");
setupCluster(Arrays.asList(bad1, bad2));
Response response = interceptor.createNewApplication(null);
@ -195,17 +198,21 @@ public class TestFederationInterceptorRESTRetry
@Test
public void testGetNewApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with one bad, one good SC");
LOG.info("Test getNewApplication with one bad, one good SC.");
setupCluster(Arrays.asList(good, bad2));
Response response = interceptor.createNewApplication(null);
Assert.assertNotNull(response);
Assert.assertEquals(OK, response.getStatus());
NewApplication newApp = (NewApplication) response.getEntity();
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
Assert.assertNotNull(newApp);
Assert.assertEquals(Integer.parseInt(good.getId()),
appId.getClusterTimestamp());
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
Assert.assertNotNull(appId);
Assert.assertEquals(Integer.parseInt(good.getId()), appId.getClusterTimestamp());
}
/**
@ -216,6 +223,8 @@ public class TestFederationInterceptorRESTRetry
public void testSubmitApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
LOG.info("Test submitApplication with one bad SC.");
setupCluster(Arrays.asList(bad2));
ApplicationId appId =