YARN-6896. Federation: routing REST invocations transparently to multiple RMs (part 1 - basic execution). (Contributed by Giovanni Matteo Fumarola via curino)

(cherry picked from commit cc59b5fb26)
This commit is contained in:
Carlo Curino 2017-08-11 15:58:01 -07:00
parent a1ee4ad77f
commit 8220b19af7
10 changed files with 1650 additions and 18 deletions

View File

@ -2653,6 +2653,16 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.webapp."
+ "DefaultRequestInterceptorREST";
/**
* The interceptor class used in FederationInterceptorREST to communicate with
* each SubCluster.
*/
public static final String ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS =
ROUTER_WEBAPP_PREFIX + "default-interceptor-class";
public static final String DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS =
"org.apache.hadoop.yarn.server.router.webapp."
+ "DefaultRequestInterceptorREST";
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -83,6 +83,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS);
configurationPropsToSkipCompare
.add(YarnConfiguration.ROUTER_RMADMIN_ADDRESS);
configurationPropsToSkipCompare
.add(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
// Federation policies configs to be ignored
configurationPropsToSkipCompare

View File

@ -30,6 +30,7 @@ import javax.ws.rs.core.Response;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
@ -66,10 +67,23 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
* implementation that simply forwards the client requests to the resource
* manager.
*/
public final class DefaultRequestInterceptorREST
public class DefaultRequestInterceptorREST
extends AbstractRESTRequestInterceptor {
private String webAppAddress;
private SubClusterId subClusterId = null;
public void setWebAppAddress(String webAppAddress) {
this.webAppAddress = webAppAddress;
}
protected void setSubClusterId(SubClusterId scId) {
this.subClusterId = scId;
}
protected SubClusterId getSubClusterId() {
return this.subClusterId;
}
@Override
public void init(String user) {

View File

@ -0,0 +1,750 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.util.*;
/**
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
* implementation for federation of YARN RM and scaling an application across
* multiple YARN SubClusters. All the federation specific implementation is
* encapsulated in this class. This is always the last intercepter in the chain.
*/
public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private static final Logger LOG =
LoggerFactory.getLogger(FederationInterceptorREST.class);
private int numSubmitRetries;
private FederationStateStoreFacade federationFacade;
private Random rand;
private RouterPolicyFacade policyFacade;
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
@Override
public void init(String user) {
federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random(System.currentTimeMillis());
final Configuration conf = this.getConf();
try {
policyFacade = new RouterPolicyFacade(conf, federationFacade,
this.federationFacade.getSubClusterResolver(), null);
} catch (FederationPolicyInitializationException e) {
LOG.error(e.getMessage());
}
numSubmitRetries =
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
}
private SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubclusters,
List<SubClusterId> blackListSubClusters) throws YarnException {
if (activeSubclusters == null || activeSubclusters.size() < 1) {
RouterServerUtil.logAndThrowException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
FederationPolicyUtils.validateSubClusterAvailability(list,
blackListSubClusters);
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
for (SubClusterId scId : blackListSubClusters) {
list.remove(scId);
}
}
return list.get(rand.nextInt(list.size()));
}
@VisibleForTesting
protected DefaultRequestInterceptorREST getInterceptorForSubCluster(
SubClusterId subClusterId) {
if (interceptors.containsKey(subClusterId)) {
return interceptors.get(subClusterId);
} else {
LOG.error("The interceptor for SubCluster " + subClusterId
+ " does not exist in the cache.");
return null;
}
}
private DefaultRequestInterceptorREST createInterceptorForSubCluster(
SubClusterId subClusterId, String webAppAddress) {
final Configuration conf = this.getConf();
String interceptorClassName =
conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
DefaultRequestInterceptorREST interceptorInstance = null;
try {
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
if (DefaultRequestInterceptorREST.class
.isAssignableFrom(interceptorClass)) {
interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils
.newInstance(interceptorClass, conf);
} else {
throw new YarnRuntimeException(
"Class: " + interceptorClassName + " not instance of "
+ DefaultRequestInterceptorREST.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate ApplicationMasterRequestInterceptor: "
+ interceptorClassName,
e);
}
interceptorInstance.setWebAppAddress(webAppAddress);
interceptorInstance.setSubClusterId(subClusterId);
interceptors.put(subClusterId, interceptorInstance);
return interceptorInstance;
}
@VisibleForTesting
protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster(
SubClusterId subClusterId, String webAppAddress) {
DefaultRequestInterceptorREST interceptor =
getInterceptorForSubCluster(subClusterId);
if (interceptor == null) {
interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress);
}
return interceptor;
}
/**
* Yarn Router forwards every getNewApplication requests to any RM. During
* this operation there will be no communication with the State Store. The
* Router will forward the requests to any SubCluster. The Router will retry
* to submit the request on #numSubmitRetries different SubClusters. The
* SubClusters are randomly chosen from the active ones.
* <p>
* Possible failures and behaviors:
* <p>
* Client: identical behavior as {@code RMWebServices}.
* <p>
* Router: the Client will timeout and resubmit.
* <p>
* ResourceManager: the Router will timeout and contacts another RM.
* <p>
* StateStore: not in the execution.
*/
@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
Map<SubClusterId, SubClusterInfo> subClustersActive;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
return Response.status(Status.INTERNAL_SERVER_ERROR)
.entity(e.getLocalizedMessage()).build();
}
List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId;
try {
subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
} catch (YarnException e) {
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage()).build();
}
LOG.debug(
"getNewApplication try #" + i + " on SubCluster " + subClusterId);
DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(subClusterId,
subClustersActive.get(subClusterId).getRMWebServiceAddress());
Response response = null;
try {
response = interceptor.createNewApplication(hsr);
} catch (Exception e) {
LOG.warn("Unable to create a new ApplicationId in SubCluster "
+ subClusterId.getId(), e);
}
if (response != null && response.getStatus() == 200) {
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
blacklist.add(subClusterId);
}
}
String errMsg = "Fail to create a new application.";
LOG.error(errMsg);
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
/**
* Today, in YARN there are no checks of any applicationId submitted.
* <p>
* Base scenarios:
* <p>
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into
* StateStore with the selected SubCluster (e.g. SC1) and the appId. The
* State Store replies with the selected SubCluster (e.g. SC1). The Router
* submits the request to the selected SubCluster.
* <p>
* In case of State Store failure:
* <p>
* The client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. Due to the
* State Store down the Router times out and it will retry depending on the
* FederationFacade settings. The Router replies to the client with an error
* message.
* <p>
* If State Store fails after inserting the tuple: identical behavior as
* {@code RMWebServices}.
* <p>
* In case of Router failure:
* <p>
* Scenario 1 Crash before submission to the ResourceManager
* <p>
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. The Router
* crashes. The Client timeouts and resubmits the application. The Router
* selects one SubCluster to forward the request. The Router inserts a tuple
* into State Store with the selected SubCluster (e.g. SC2) and the appId.
* Because the tuple is already inserted in the State Store, it returns the
* previous selected SubCluster (e.g. SC1). The Router submits the request
* to the selected SubCluster (e.g. SC1).
* <p>
* Scenario 2 Crash after submission to the ResourceManager
* <p>
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. The Router
* submits the request to the selected SubCluster. The Router crashes. The
* Client timeouts and resubmit the application. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC2) and the appId. The State
* Store replies with the selected SubCluster (e.g. SC1). The Router submits
* the request to the selected SubCluster (e.g. SC1). When a client re-submits
* the same application to the same RM, it does not raise an exception and
* replies with operation successful message.
* <p>
* In case of Client failure: identical behavior as {@code RMWebServices}.
* <p>
* In case of ResourceManager failure:
* <p>
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. The Router
* submits the request to the selected SubCluster. The entire SubCluster is
* down all the RMs in HA or the master RM is not reachable. The Router
* times out. The Router selects a new SubCluster to forward the request.
* The Router update a tuple into State Store with the selected SubCluster
* (e.g. SC2) and the appId. The State Store replies with OK answer. The
* Router submits the request to the selected SubCluster (e.g. SC2).
*/
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
if (newApp == null || newApp.getApplicationId() == null) {
String errMsg = "Missing ApplicationSubmissionContextInfo or "
+ "applicationSubmissionContex information.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(newApp.getApplicationId());
} catch (IllegalArgumentException e) {
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
.build();
}
List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
for (int i = 0; i < numSubmitRetries; ++i) {
ApplicationSubmissionContext context =
RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf());
SubClusterId subClusterId = null;
try {
subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
} catch (YarnException e) {
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage()).build();
}
LOG.info("submitApplication appId" + applicationId + " try #" + i
+ " on SubCluster " + subClusterId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
if (i == 0) {
try {
// persist the mapping of applicationId and the subClusterId which has
// been selected as its home
subClusterId =
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
String errMsg = "Unable to insert the ApplicationId " + applicationId
+ " into the FederationStateStore";
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(errMsg + " " + e.getLocalizedMessage()).build();
}
} else {
try {
// update the mapping of applicationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
String errMsg = "Unable to update the ApplicationId " + applicationId
+ " into the FederationStateStore";
SubClusterId subClusterIdInStateStore;
try {
subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
} catch (YarnException e1) {
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e1.getLocalizedMessage()).build();
}
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application " + applicationId
+ " already submitted on SubCluster " + subClusterId);
} else {
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
.build();
}
}
}
SubClusterInfo subClusterInfo;
try {
subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) {
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage()).build();
}
Response response = null;
try {
response = getOrCreateInterceptorForSubCluster(subClusterId,
subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
hsr);
} catch (Exception e) {
LOG.warn("Unable to submit the application " + applicationId
+ "to SubCluster " + subClusterId.getId(), e);
}
if (response != null && response.getStatus() == 202) {
LOG.info("Application " + context.getApplicationName() + " with appId "
+ applicationId + " submitted on " + subClusterId);
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
blacklist.add(subClusterId);
}
}
String errMsg = "Application " + newApp.getApplicationName()
+ " with appId " + applicationId + " failed to be submitted.";
LOG.error(errMsg);
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
}
/**
* The Yarn Router will forward to the respective Yarn RM in which the AM is
* running.
* <p>
* Possible failure:
* <p>
* Client: identical behavior as {@code RMWebServices}.
* <p>
* Router: the Client will timeout and resubmit the request.
* <p>
* ResourceManager: the Router will timeout and the call will fail.
* <p>
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public AppInfo getApp(HttpServletRequest hsr, String appId,
Set<String> unselectedFields) {
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(appId);
} catch (IllegalArgumentException e) {
return null;
}
SubClusterInfo subClusterInfo = null;
SubClusterId subClusterId = null;
try {
subClusterId =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == null) {
return null;
}
subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) {
return null;
}
return getOrCreateInterceptorForSubCluster(subClusterId,
subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
unselectedFields);
}
/**
* The Yarn Router will forward to the respective Yarn RM in which the AM is
* running.
* <p>
* Possible failures and behaviors:
* <p>
* Client: identical behavior as {@code RMWebServices}.
* <p>
* Router: the Client will timeout and resubmit the request.
* <p>
* ResourceManager: the Router will timeout and the call will fail.
* <p>
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(appId);
} catch (IllegalArgumentException e) {
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
.build();
}
SubClusterId subClusterId =
federationFacade.getApplicationHomeSubCluster(applicationId);
SubClusterInfo subClusterInfo =
federationFacade.getSubCluster(subClusterId);
return getOrCreateInterceptorForSubCluster(subClusterId,
subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
hsr, appId);
}
@Override
public ClusterInfo get() {
return getClusterInfo();
}
@Override
public ClusterInfo getClusterInfo() {
throw new NotImplementedException();
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
throw new NotImplementedException();
}
@Override
public SchedulerTypeInfo getSchedulerInfo() {
throw new NotImplementedException();
}
@Override
public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
throws IOException {
throw new NotImplementedException();
}
@Override
public NodesInfo getNodes(String states) {
throw new NotImplementedException();
}
@Override
public NodeInfo getNode(String nodeId) {
throw new NotImplementedException();
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
throw new NotImplementedException();
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
throw new NotImplementedException();
}
@Override
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
Set<String> stateQueries, Set<String> typeQueries) {
throw new NotImplementedException();
}
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, Set<String> unselectedFields) {
throw new NotImplementedException();
}
@Override
public AppState getAppState(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException();
}
@Override
public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
throws IOException {
throw new NotImplementedException();
}
@Override
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
throws IOException {
throw new NotImplementedException();
}
@Override
public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
HttpServletRequest hsr) throws IOException {
throw new NotImplementedException();
}
@Override
public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
HttpServletRequest hsr, String nodeId) throws Exception {
throw new NotImplementedException();
}
@Override
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
throws IOException {
throw new NotImplementedException();
}
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
HttpServletRequest hsr) throws Exception {
throw new NotImplementedException();
}
@Override
public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels,
HttpServletRequest hsr) throws Exception {
throw new NotImplementedException();
}
@Override
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
throws IOException {
throw new NotImplementedException();
}
@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException();
}
@Override
public Response updateApplicationPriority(AppPriority targetPriority,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
throw new NotImplementedException();
}
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException();
}
@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
throw new NotImplementedException();
}
@Override
public Response postDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException, Exception {
throw new NotImplementedException();
}
@Override
public Response postDelegationTokenExpiration(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
throw new NotImplementedException();
}
@Override
public Response cancelDelegationToken(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
throw new NotImplementedException();
}
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException();
}
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException();
}
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException();
}
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException();
}
@Override
public Response listReservation(String queue, String reservationId,
long startTime, long endTime, boolean includeResourceAllocations,
HttpServletRequest hsr) throws Exception {
throw new NotImplementedException();
}
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
String type) throws AuthorizationException {
throw new NotImplementedException();
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException();
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
throw new NotImplementedException();
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
throw new NotImplementedException();
}
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
throw new NotImplementedException();
}
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
throw new NotImplementedException();
}
@Override
public ContainerInfo getContainer(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId,
String containerId) {
throw new NotImplementedException();
}
@Override
public void setNextInterceptor(RESTRequestInterceptor next) {
throw new YarnRuntimeException("setNextInterceptor is being called on "
+ "FederationInterceptorREST, which should be the last one "
+ "in the chain. Check if the interceptor pipeline configuration "
+ "is correct");
}
}

View File

@ -27,6 +27,7 @@ import java.security.PrivilegedExceptionAction;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -75,22 +76,8 @@ public abstract class BaseRouterWebServicesTest {
private RouterWebServices routerWebService;
@Before
public void setup() {
conf = new YarnConfiguration();
String mockPassThroughInterceptorClass =
PassThroughRESTRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
conf.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + mockPassThroughInterceptorClass + ","
+ MockRESTRequestInterceptor.class.getName());
conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE);
public void setUp() {
this.conf = createConfiguration();
router = spy(new Router());
Mockito.doNothing().when(router).startWepApp();
@ -101,6 +88,24 @@ public abstract class BaseRouterWebServicesTest {
router.start();
}
protected YarnConfiguration createConfiguration() {
YarnConfiguration config = new YarnConfiguration();
String mockPassThroughInterceptorClass =
PassThroughRESTRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
config.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + mockPassThroughInterceptorClass + ","
+ MockRESTRequestInterceptor.class.getName());
config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE);
return config;
}
@After
public void tearDown() {
if (router != null) {
@ -108,6 +113,14 @@ public abstract class BaseRouterWebServicesTest {
}
}
public void setUpConfig() {
this.conf = createConfiguration();
}
protected Configuration getConf() {
return this.conf;
}
protected RouterWebServices getRouterWebServices() {
Assert.assertNotNull(this.routerWebService);
return this.routerWebService;

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.ConnectException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class mocks the RESTRequestInterceptor.
*/
public class MockDefaultRequestInterceptorREST
extends DefaultRequestInterceptorREST {
private static final Logger LOG =
LoggerFactory.getLogger(MockDefaultRequestInterceptorREST.class);
final private AtomicInteger applicationCounter = new AtomicInteger(0);
// True if the Mock RM is running, false otherwise.
// This property allows us to write tests for specific scenario as Yarn RM
// down e.g. network issue, failover.
private boolean isRunning = true;
private HashSet<ApplicationId> applicationMap = new HashSet<>();
private void validateRunning() throws ConnectException {
if (!isRunning) {
throw new ConnectException("RM is stopped");
}
}
@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
validateRunning();
ApplicationId applicationId =
ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
applicationCounter.incrementAndGet());
NewApplication appId =
new NewApplication(applicationId.toString(), new ResourceInfo());
return Response.status(Status.OK).entity(appId).build();
}
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
validateRunning();
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
LOG.info("Application submitted: " + appId);
applicationMap.add(appId);
return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "")
.entity(getSubClusterId()).build();
}
@Override
public AppInfo getApp(HttpServletRequest hsr, String appId,
Set<String> unselectedFields) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
return new AppInfo();
}
@Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
validateRunning();
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.remove(applicationId)) {
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
if (targetState == null) {
return Response.status(Status.BAD_REQUEST).build();
}
LOG.info("Force killing application: " + appId);
AppState ret = new AppState();
ret.setState(targetState.toString());
return Response.status(Status.OK).entity(ret).build();
}
public void setSubClusterId(int subClusterId) {
setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
}
public boolean isRunning() {
return isRunning;
}
public void setRunning(boolean runningMode) {
this.isRunning = runningMode;
}
}

View File

@ -0,0 +1,379 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.Response;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
* {@code FederationInterceptor} class. The tests for
* {@code RouterClientRMService} has been written cleverly so that it can be
* reused to validate different request intercepter chains.
*/
public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationInterceptorREST.class);
private final static int NUM_SUBCLUSTER = 4;
private static final int BAD_REQUEST = 400;
private static final int ACCEPTED = 202;
private static String user = "test-user";
private TestableFederationInterceptorREST interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private List<SubClusterId> subClusters;
@Override
public void setUp() {
super.setUpConfig();
interceptor = new TestableFederationInterceptorREST();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
this.getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
subClusters = new ArrayList<>();
try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
stateStoreUtil.registerSubCluster(sc);
subClusters.add(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
Assert.fail();
}
}
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
MockDefaultRequestInterceptorREST.class.getName());
String mockPassThroughInterceptorClass =
PassThroughRESTRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + ","
+ TestableFederationInterceptorREST.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
return conf;
}
/**
* This test validates the correctness of GetNewApplication. The return
* ApplicationId has to belong to one of the SubCluster in the cluster.
*/
@Test
public void testGetNewApplication()
throws YarnException, IOException, InterruptedException {
Response response = interceptor.createNewApplication(null);
Assert.assertNotNull(response);
NewApplication ci = (NewApplication) response.getEntity();
Assert.assertNotNull(ci);
ApplicationId appId = ApplicationId.fromString(ci.getApplicationId());
Assert.assertTrue(appId.getClusterTimestamp() < NUM_SUBCLUSTER);
Assert.assertTrue(appId.getClusterTimestamp() >= 0);
}
/**
* This test validates the correctness of SubmitApplication. The application
* has to be submitted to one of the SubCluster in the cluster.
*/
@Test
public void testSubmitApplication()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(ACCEPTED, response.getStatus());
SubClusterId ci = (SubClusterId) response.getEntity();
Assert.assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult);
Assert.assertTrue(subClusters.contains(scIdResult));
Assert.assertEquals(ci, scIdResult);
}
/**
* This test validates the correctness of SubmitApplication in case of
* multiple submission. The first retry has to be submitted to the same
* SubCluster of the first attempt.
*/
@Test
public void testSubmitApplicationMultipleSubmission()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// First attempt
Response response = interceptor.submitApplication(context, null);
Assert.assertNotNull(response);
Assert.assertEquals(ACCEPTED, response.getStatus());
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult);
// First retry
response = interceptor.submitApplication(context, null);
Assert.assertNotNull(response);
Assert.assertEquals(ACCEPTED, response.getStatus());
SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult2);
Assert.assertEquals(scIdResult, scIdResult2);
}
/**
* This test validates the correctness of SubmitApplication in case of empty
* request.
*/
@Test
public void testSubmitApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
// ApplicationSubmissionContextInfo null
Response response = interceptor.submitApplication(null, null);
Assert.assertEquals(BAD_REQUEST, response.getStatus());
// ApplicationSubmissionContextInfo empty
response = interceptor
.submitApplication(new ApplicationSubmissionContextInfo(), null);
Assert.assertEquals(BAD_REQUEST, response.getStatus());
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
response = interceptor.submitApplication(context, null);
Assert.assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of SubmitApplication in case of of
* application in wrong format.
*/
@Test
public void testSubmitApplicationWrongFormat()
throws YarnException, IOException, InterruptedException {
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId("Application_wrong_id");
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of ForceKillApplication in case the
* application exists in the cluster.
*/
@Test
public void testForceKillApplication()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we are going to kill later
Response response = interceptor.submitApplication(context, null);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
AppState appState = new AppState("KILLED");
Response responseKill =
interceptor.updateAppState(appState, null, appId.toString());
Assert.assertNotNull(responseKill);
}
/**
* This test validates the correctness of ForceKillApplication in case of
* application does not exist in StateStore.
*/
@Test
public void testForceKillApplicationNotExists()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppState appState = new AppState("KILLED");
try {
interceptor.updateAppState(appState, null, appId.toString());
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().equals("Application " + appId + " does not exist"));
}
}
/**
* This test validates the correctness of ForceKillApplication in case of
* application in wrong format.
*/
@Test
public void testForceKillApplicationWrongFormat()
throws YarnException, IOException, InterruptedException {
AppState appState = new AppState("KILLED");
Response response =
interceptor.updateAppState(appState, null, "Application_wrong_id");
Assert.assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of ForceKillApplication in case of
* empty request.
*/
@Test
public void testForceKillApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we are going to kill later
interceptor.submitApplication(context, null);
Response response =
interceptor.updateAppState(null, null, appId.toString());
Assert.assertEquals(BAD_REQUEST, response.getStatus());
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationReport()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Submit the application we want the report later
Response response = interceptor.submitApplication(context, null);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
AppInfo responseGet = interceptor.getApp(null, appId.toString(), null);
Assert.assertNotNull(responseGet);
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationNotExists()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppInfo response = interceptor.getApp(null, appId.toString(), null);
Assert.assertNull(response);
}
/**
* This test validates the correctness of GetApplicationReport in case of
* application in wrong format.
*/
@Test
public void testGetApplicationWrongFormat()
throws YarnException, IOException, InterruptedException {
AppInfo response = interceptor.getApp(null, "Application_wrong_id", null);
Assert.assertNull(response);
}
}

View File

@ -0,0 +1,274 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.ws.rs.core.Response;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the {@code BaseRouterWebServicesTest} and overrides methods in order
* to use the {@code RouterWebServices} pipeline test cases for testing the
* {@code FederationInterceptorREST} class. The tests for
* {@code RouterWebServices} has been written cleverly so that it can be reused
* to validate different request interceptor chains.
* <p>
* It tests the case with SubClusters down and the Router logic of retries. We
* have 1 good SubCluster and 2 bad ones for all the tests.
*/
public class TestFederationInterceptorRESTRetry
extends BaseRouterWebServicesTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationInterceptorRESTRetry.class);
private static final int SERVICE_UNAVAILABLE = 503;
private static final int ACCEPTED = 202;
private static final int OK = 200;
// running and registered
private static SubClusterId good;
// registered but not running
private static SubClusterId bad1;
private static SubClusterId bad2;
private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
private TestableFederationInterceptorREST interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private String user = "test-user";
@Override
public void setUp() {
super.setUpConfig();
interceptor = new TestableFederationInterceptorREST();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
// Create SubClusters
good = SubClusterId.newInstance("0");
bad1 = SubClusterId.newInstance("1");
bad2 = SubClusterId.newInstance("2");
scs.add(good);
scs.add(bad1);
scs.add(bad2);
// The mock RM will not start in these SubClusters, this is done to simulate
// a SubCluster down
interceptor.registerBadSubCluster(bad1);
interceptor.registerBadSubCluster(bad2);
}
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
private void setupCluster(List<SubClusterId> scsToRegister)
throws YarnException {
try {
// Clean up the StateStore before every test
stateStoreUtil.deregisterAllSubClusters();
for (SubClusterId sc : scsToRegister) {
stateStoreUtil.registerSubCluster(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
Assert.fail();
}
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
MockDefaultRequestInterceptorREST.class.getName());
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + ","
+ TestableFederationClientInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
return conf;
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 1 bad SubCluster.
*/
@Test
public void testGetNewApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
Response response = interceptor.createNewApplication(null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 2 bad SubClusters.
*/
@Test
public void testGetNewApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
Response response = interceptor.createNewApplication(null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 1 bad SubCluster and 1 good one.
*/
@Test
public void testGetNewApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with one bad, one good SC");
setupCluster(Arrays.asList(good, bad2));
Response response = interceptor.createNewApplication(null);
Assert.assertEquals(OK, response.getStatus());
NewApplication newApp = (NewApplication) response.getEntity();
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
Assert.assertEquals(Integer.parseInt(good.getId()),
appId.getClusterTimestamp());
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 1 bad SubCluster.
*/
@Test
public void testSubmitApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 2 bad SubClusters.
*/
@Test
public void testSubmitApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testSubmitApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with one bad, one good SC");
setupCluster(Arrays.asList(good, bad2));
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(ACCEPTED, response.getStatus());
Assert.assertEquals(good,
stateStore
.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId))
.getApplicationHomeSubCluster().getHomeSubCluster());
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* Extends the FederationInterceptorREST and overrides methods to provide a
* testable implementation of FederationInterceptorREST.
*/
public class TestableFederationInterceptorREST
extends FederationInterceptorREST {
private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
/**
* For testing purpose, some subclusters has to be down to simulate particular
* scenarios as RM Failover, network issues. For this reason we keep track of
* these bad subclusters. This method make the subcluster unusable.
*
* @param badSC the subcluster to make unusable
*/
protected void registerBadSubCluster(SubClusterId badSC) {
// Adding in the cache the bad SubCluster, in this way we can stop them
getOrCreateInterceptorForSubCluster(badSC, "test");
badSubCluster.add(badSC);
MockDefaultRequestInterceptorREST interceptor =
(MockDefaultRequestInterceptorREST) super.getInterceptorForSubCluster(
badSC);
interceptor.setRunning(false);
}
}

View File

@ -216,7 +216,7 @@ Optional:
|`yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. |
|`yarn.federation.statestore.max-connections` | `10` | This is the maximum number of parallel connections each Router makes to the state-store. |
|`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. |
|`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-seperated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. |
###ON NMs: