YARN-5412. Create a proxy chain for ResourceManager REST API in the Router. (Contributed by Giovanni Matteo Fumarola via curino)

This commit is contained in:
Carlo Curino 2017-07-27 14:34:45 -07:00
parent 40453879ec
commit b6240b92ab
28 changed files with 5239 additions and 71 deletions
hadoop-yarn-project/hadoop-yarn
hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf
hadoop-yarn-common/src/main
java/org/apache/hadoop/yarn/webapp/util
resources
hadoop-yarn-server

View File

@ -2682,6 +2682,30 @@ public class YarnConfiguration extends Configuration {
ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp.";
/** The address of the Router web application. */
public static final String ROUTER_WEBAPP_ADDRESS =
ROUTER_WEBAPP_PREFIX + "address";
public static final int DEFAULT_ROUTER_WEBAPP_PORT = 8089;
public static final String DEFAULT_ROUTER_WEBAPP_ADDRESS =
"0.0.0.0:" + DEFAULT_ROUTER_WEBAPP_PORT;
/** The https address of the Router web application. */
public static final String ROUTER_WEBAPP_HTTPS_ADDRESS =
ROUTER_WEBAPP_PREFIX + "https.address";
public static final int DEFAULT_ROUTER_WEBAPP_HTTPS_PORT = 8091;
public static final String DEFAULT_ROUTER_WEBAPP_HTTPS_ADDRESS =
"0.0.0.0:" + DEFAULT_ROUTER_WEBAPP_HTTPS_PORT;
public static final String ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE =
ROUTER_WEBAPP_PREFIX + "interceptor-class.pipeline";
public static final String DEFAULT_ROUTER_WEBAPP_INTERCEPTOR_CLASS =
"org.apache.hadoop.yarn.server.router.webapp."
+ "DefaultRequestInterceptorREST";
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -129,6 +129,20 @@ public class WebAppUtils {
return getRMWebAppURLWithoutScheme(conf, false);
}
public static String getRouterWebAppURLWithScheme(Configuration conf) {
return getHttpSchemePrefix(conf) + getRouterWebAppURLWithoutScheme(conf);
}
public static String getRouterWebAppURLWithoutScheme(Configuration conf) {
if (YarnConfiguration.useHttps(conf)) {
return conf.get(YarnConfiguration.ROUTER_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_HTTPS_ADDRESS);
} else {
return conf.get(YarnConfiguration.ROUTER_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_ADDRESS);
}
}
public static List<String> getProxyHostsAndPortsForAmFilter(
Configuration conf) {
List<String> addrs = new ArrayList<String>();

View File

@ -3214,4 +3214,34 @@
<value>user-group</value>
</property>
<property>
<description>
The comma separated list of class names that implement the
RequestInterceptor interface. This is used by the RouterWebServices
to create the request processing pipeline for users.
</description>
<name>yarn.router.webapp.interceptor-class.pipeline</name>
<value>org.apache.hadoop.yarn.server.router.webapp.DefaultRequestInterceptorREST</value>
</property>
<property>
<description>
The http address of the Router web application.
If only a host is provided as the value,
the webapp will be served on a random port.
</description>
<name>yarn.router.webapp.address</name>
<value>0.0.0.0:8089</value>
</property>
<property>
<description>
The https address of the Router web application.
If only a host is provided as the value,
the webapp will be served on a random port.
</description>
<name> yarn.router.webapp.https.address</name>
<value>0.0.0.0:8091</value>
</property>
</configuration>

View File

@ -168,6 +168,12 @@ public final class RMWSConsts {
*/
public static final String APPS_TIMEOUT = "/apps/{appid}/timeout";
/**
* Path for {@code RouterWebServices#getContainer}.
*/
public static final String GET_CONTAINER =
"/apps/{appid}/appattempts/{appattemptid}/containers/{containerid}";
// ----------------QueryParams for RMWebServiceProtocol----------------
public static final String TIME = "time";
@ -194,6 +200,15 @@ public final class RMWSConsts {
public static final String END_TIME = "end-time";
public static final String INCLUDE_RESOURCE = "include-resource-allocations";
public static final String TYPE = "type";
public static final String CONTAINERID = "containerid";
public static final String APPATTEMPTS = "appattempts";
public static final String TIMEOUTS = "timeouts";
public static final String PRIORITY = "priority";
public static final String TIMEOUT = "timeout";
public static final String ATTEMPTS = "appattempts";
public static final String GET_LABELS = "get-labels";
public static final String DESELECTS = "deSelects";
public static final String CONTAINERS = "containers";
private RMWSConsts() {
// not called

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -341,4 +344,30 @@ public final class RMWebAppUtil {
logAggregationContextInfo.getLogAggregationPolicyClassName(),
logAggregationContextInfo.getLogAggregationPolicyParameters());
}
/**
* Helper method to retrieve the UserGroupInformation from the
* HttpServletRequest.
*
* @param hsr the servlet request
* @param usePrincipal true if we need to use the principal user, remote
* otherwise.
* @return the user group information of the caller.
**/
public static UserGroupInformation getCallerUserGroupInformation(
HttpServletRequest hsr, boolean usePrincipal) {
String remoteUser = hsr.getRemoteUser();
if (usePrincipal) {
Principal princ = hsr.getUserPrincipal();
remoteUser = princ == null ? null : princ.getName();
}
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
return callerUGI;
}
}

View File

@ -108,7 +108,7 @@ public interface RMWebServiceProtocol {
* This method dumps the scheduler logs for the time got in input, and it is
* reachable by using {@link RMWSConsts#SCHEDULER_LOGS}.
*
* @param time the period of time
* @param time the period of time. It is a FormParam.
* @param hsr the servlet request
* @return the result of the operation
* @throws IOException when it cannot create dump log file
@ -121,7 +121,7 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#NODES}.
*
* @see ApplicationClientProtocol#getClusterNodes
* @param states the states we want to filter
* @param states the states we want to filter. It is a QueryParam.
* @return all nodes in the cluster. If the states param is given, returns all
* nodes that are in the comma-separated list of states
*/
@ -131,7 +131,8 @@ public interface RMWebServiceProtocol {
* This method retrieves a specific node information, and it is reachable by
* using {@link RMWSConsts#NODES_NODEID}.
*
* @param nodeId the node we want to retrieve the information
* @param nodeId the node we want to retrieve the information. It is a
* PathParam.
* @return the information about the node in input
*/
NodeInfo getNode(String nodeId);
@ -142,19 +143,25 @@ public interface RMWebServiceProtocol {
*
* @see ApplicationClientProtocol#getApplications
* @param hsr the servlet request
* @param stateQuery right now the stateQuery is deprecated
* @param statesQuery filter the result by states
* @param finalStatusQuery filter the result by final states
* @param userQuery filter the result by user
* @param queueQuery filter the result by queue
* @param count set a limit of the result
* @param startedBegin filter the result by started begin time
* @param startedEnd filter the result by started end time
* @param finishBegin filter the result by finish begin time
* @param finishEnd filter the result by finish end time
* @param applicationTypes filter the result by types
* @param applicationTags filter the result by tags
* @param unselectedFields De-selected params to avoid from report
* @param stateQuery right now the stateQuery is deprecated. It is a
* QueryParam.
* @param statesQuery filter the result by states. It is a QueryParam.
* @param finalStatusQuery filter the result by final states. It is a
* QueryParam.
* @param userQuery filter the result by user. It is a QueryParam.
* @param queueQuery filter the result by queue. It is a QueryParam.
* @param count set a limit of the result. It is a QueryParam.
* @param startedBegin filter the result by started begin time. It is a
* QueryParam.
* @param startedEnd filter the result by started end time. It is a
* QueryParam.
* @param finishBegin filter the result by finish begin time. It is a
* QueryParam.
* @param finishEnd filter the result by finish end time. It is a QueryParam.
* @param applicationTypes filter the result by types. It is a QueryParam.
* @param applicationTags filter the result by tags. It is a QueryParam.
* @param unselectedFields De-selected params to avoid from report. It is a
* QueryParam.
* @return all apps in the cluster
*/
@SuppressWarnings("checkstyle:parameternumber")
@ -169,7 +176,8 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#SCHEDULER_ACTIVITIES}.
*
* @param hsr the servlet request
* @param nodeId the node we want to retrieve the activities
* @param nodeId the node we want to retrieve the activities. It is a
* QueryParam.
* @return all the activities in the specific node
*/
ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId);
@ -180,8 +188,10 @@ public interface RMWebServiceProtocol {
* {@link RMWSConsts#SCHEDULER_APP_ACTIVITIES}.
*
* @param hsr the servlet request
* @param appId the applicationId we want to retrieve the activities
* @param time for how long we want to retrieve the activities
* @param appId the applicationId we want to retrieve the activities. It is a
* QueryParam.
* @param time for how long we want to retrieve the activities. It is a
* QueryParam.
* @return all the activities about a specific app for a specific time
*/
AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId,
@ -192,8 +202,8 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#APP_STATISTICS}.
*
* @param hsr the servlet request
* @param stateQueries filter the result by states
* @param typeQueries filter the result by type names
* @param stateQueries filter the result by states. It is a QueryParam.
* @param typeQueries filter the result by type names. It is a QueryParam.
* @return the application's statistics for specific states and types
*/
ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
@ -205,8 +215,10 @@ public interface RMWebServiceProtocol {
*
* @see ApplicationClientProtocol#getApplicationReport
* @param hsr the servlet request
* @param appId the Id of the application we want the report
* @param unselectedFields De-selected param list to avoid from report
* @param appId the Id of the application we want the report. It is a
* PathParam.
* @param unselectedFields De-selected param list to avoid from report. It is
* a QueryParam.
* @return the app report for a specific application
*/
AppInfo getApp(HttpServletRequest hsr, String appId,
@ -217,7 +229,8 @@ public interface RMWebServiceProtocol {
* using {@link RMWSConsts#APPS_APPID_STATE}.
*
* @param hsr the servlet request
* @param appId the Id of the application we want the state
* @param appId the Id of the application we want the state. It is a
* PathParam.
* @return the state for a specific application
* @throws AuthorizationException if the user is not authorized
*/
@ -228,9 +241,10 @@ public interface RMWebServiceProtocol {
* This method updates the state of the app in input, and it is reachable by
* using {@link RMWSConsts#APPS_APPID_STATE}.
*
* @param targetState the target state for the app
* @param targetState the target state for the app. It is a content param.
* @param hsr the servlet request
* @param appId the Id of the application we want to update the state
* @param appId the Id of the application we want to update the state. It is a
* PathParam.
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
* method
@ -259,7 +273,7 @@ public interface RMWebServiceProtocol {
* cluster, and it is reachable by using {@link RMWSConsts#LABEL_MAPPINGS}.
*
* @see ApplicationClientProtocol#getLabelsToNodes
* @param labels filter the result by node labels
* @param labels filter the result by node labels. It is a QueryParam.
* @return all the nodes within multiple node labels
* @throws IOException if an IOException happened
*/
@ -270,7 +284,7 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#REPLACE_NODE_TO_LABELS}.
*
* @see ResourceManagerAdministrationProtocol#replaceLabelsOnNode
* @param newNodeToLabels the list of new labels
* @param newNodeToLabels the list of new labels. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws Exception if an exception happened
@ -283,9 +297,10 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#NODES_NODEID_REPLACE_LABELS}.
*
* @see ResourceManagerAdministrationProtocol#replaceLabelsOnNode
* @param newNodeLabelsName the list of new labels
* @param newNodeLabelsName the list of new labels. It is a QueryParam.
* @param hsr the servlet request
* @param nodeId the node we want to replace the node labels
* @param nodeId the node we want to replace the node labels. It is a
* PathParam.
* @return Response containing the status code
* @throws Exception if an exception happened
*/
@ -309,7 +324,7 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#ADD_NODE_LABELS}.
*
* @see ResourceManagerAdministrationProtocol#addToClusterNodeLabels
* @param newNodeLabels the node labels to add
* @param newNodeLabels the node labels to add. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws Exception in case of bad request
@ -322,7 +337,7 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#REMOVE_NODE_LABELS}.
*
* @see ResourceManagerAdministrationProtocol#removeFromClusterNodeLabels
* @param oldNodeLabels the node labels to remove
* @param oldNodeLabels the node labels to remove. It is a QueryParam.
* @param hsr the servlet request
* @return Response containing the status code
* @throws Exception in case of bad request
@ -335,7 +350,8 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#NODES_NODEID_GETLABELS}.
*
* @param hsr the servlet request
* @param nodeId the node we want to get all the node labels
* @param nodeId the node we want to get all the node labels. It is a
* PathParam.
* @return all the labels for a specific node.
* @throws IOException if an IOException happened
*/
@ -347,7 +363,7 @@ public interface RMWebServiceProtocol {
* by using {@link RMWSConsts#APPS_APPID_PRIORITY}.
*
* @param hsr the servlet request
* @param appId the app we want to get the priority
* @param appId the app we want to get the priority. It is a PathParam.
* @return the priority for a specific application
* @throws AuthorizationException in case of the user is not authorized
*/
@ -358,9 +374,11 @@ public interface RMWebServiceProtocol {
* This method updates the priority for a specific application, and it is
* reachable by using {@link RMWSConsts#APPS_APPID_PRIORITY}.
*
* @param targetPriority the priority we want to set for the app
* @param targetPriority the priority we want to set for the app. It is a
* content param.
* @param hsr the servlet request
* @param appId the application we want to update its priority
* @param appId the application we want to update its priority. It is a
* PathParam.
* @return Response containing the status code
* @throws AuthorizationException if the user is not authenticated
* @throws YarnException if the target is null
@ -376,7 +394,8 @@ public interface RMWebServiceProtocol {
* using {@link RMWSConsts#APPS_APPID_QUEUE}.
*
* @param hsr the servlet request
* @param appId the application we want to retrieve its queue
* @param appId the application we want to retrieve its queue. It is a
* PathParam.
* @return the Queue for a specific application.
* @throws AuthorizationException if the user is not authenticated
*/
@ -387,9 +406,10 @@ public interface RMWebServiceProtocol {
* This method updates the queue for a specific application, and it is
* reachable by using {@link RMWSConsts#APPS_APPID_QUEUE}.
*
* @param targetQueue the queue we want to set
* @param targetQueue the queue we want to set. It is a content param.
* @param hsr the servlet request
* @param appId the application we want to change its queue
* @param appId the application we want to change its queue. It is a
* PathParam.
* @return Response containing the status code
* @throws AuthorizationException if the user is not authenticated
* @throws YarnException if the app is not found
@ -424,7 +444,7 @@ public interface RMWebServiceProtocol {
* @see ApplicationClientProtocol#submitApplication
*
* @param newApp structure containing information to construct the
* ApplicationSubmissionContext
* ApplicationSubmissionContext. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
@ -441,7 +461,7 @@ public interface RMWebServiceProtocol {
* by using {@link RMWSConsts#DELEGATION_TOKEN}.
*
* @see ApplicationBaseProtocol#getDelegationToken
* @param tokenData the token to delegate
* @param tokenData the token to delegate. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if Kerberos auth failed
@ -508,7 +528,7 @@ public interface RMWebServiceProtocol {
* @see ApplicationClientProtocol#submitReservation
*
* @param resContext provides information to construct the
* ReservationSubmissionRequest
* ReservationSubmissionRequest. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
@ -527,7 +547,7 @@ public interface RMWebServiceProtocol {
* @see ApplicationClientProtocol#updateReservation
*
* @param resContext provides information to construct the
* ReservationUpdateRequest
* ReservationUpdateRequest. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
@ -546,7 +566,7 @@ public interface RMWebServiceProtocol {
* @see ApplicationClientProtocol#deleteReservation
*
* @param resContext provides information to construct the
* ReservationDeleteRequest
* ReservationDeleteRequest. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException when the user group information cannot be
@ -566,12 +586,13 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#RESERVATION_LIST}.
*
* @see ApplicationClientProtocol#listReservations
* @param queue filter the result by queue
* @param reservationId filter the result by reservationId
* @param startTime filter the result by start time
* @param endTime filter the result by end time
* @param queue filter the result by queue. It is a QueryParam.
* @param reservationId filter the result by reservationId. It is a
* QueryParam.
* @param startTime filter the result by start time. It is a QueryParam.
* @param endTime filter the result by end time. It is a QueryParam.
* @param includeResourceAllocations true if the resource allocation should be
* in the result, false otherwise
* in the result, false otherwise. It is a QueryParam.
* @param hsr the servlet request
* @return Response containing the status code
* @throws Exception in case of bad request
@ -586,8 +607,8 @@ public interface RMWebServiceProtocol {
* {@link RMWSConsts#APPS_TIMEOUTS_TYPE}.
*
* @param hsr the servlet request
* @param appId the application we want to get the timeout
* @param type the type of the timeouts
* @param appId the application we want to get the timeout. It is a PathParam.
* @param type the type of the timeouts. It is a PathParam.
* @return the timeout for a specific application with a specific type.
* @throws AuthorizationException if the user is not authorized
*/
@ -599,7 +620,8 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#APPS_TIMEOUTS}.
*
* @param hsr the servlet request
* @param appId the application we want to get the timeouts
* @param appId the application we want to get the timeouts. It is a
* PathParam.
* @return the timeouts for a specific application
* @throws AuthorizationException if the user is not authorized
*/
@ -611,9 +633,9 @@ public interface RMWebServiceProtocol {
* reachable by using {@link RMWSConsts#APPS_TIMEOUT}.
*
* @see ApplicationClientProtocol#updateApplicationTimeouts
* @param appTimeout the appTimeoutInfo
* @param appTimeout the appTimeoutInfo. It is a content param.
* @param hsr the servlet request
* @param appId the application we want to update
* @param appId the application we want to update. It is a PathParam.
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
* method
@ -631,7 +653,8 @@ public interface RMWebServiceProtocol {
*
* @see ApplicationBaseProtocol#getApplicationAttempts
* @param hsr the servlet request
* @param appId the application we want to get the attempts
* @param appId the application we want to get the attempts. It is a
* PathParam.
* @return all the attempts info for a specific application
*/
AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId);

View File

@ -436,7 +436,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
@QueryParam(RMWSConsts.FINISHED_TIME_END) String finishEnd,
@QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> applicationTypes,
@QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags,
@QueryParam("deSelects") Set<String> unselectedFields) {
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
boolean checkCount = false;
boolean checkStart = false;
boolean checkEnd = false;
@ -823,7 +823,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
@Override
public AppInfo getApp(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId,
@QueryParam("deSelects") Set<String> unselectedFields) {
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
init();
ApplicationId id = WebAppUtils.parseApplicationId(recordFactory, appId);
RMApp app = rm.getRMContext().getRMApps().get(id);

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@XmlRootElement(name = "appAttempt")
@ -106,4 +105,8 @@ public class AppAttemptInfo {
public String getLogsLink() {
return this.logsLink;
}
public String getAppAttemptId() {
return this.appAttemptId;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -86,12 +87,8 @@ public class TestFederationRMStateStoreService {
// Initially there should be no entry for the sub-cluster
rm.init(conf);
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
try {
stateStore.getSubCluster(request);
Assert.fail("There should be no entry for the sub-cluster.");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().endsWith("does not exist"));
}
GetSubClusterInfoResponse response = stateStore.getSubCluster(request);
Assert.assertNull(response);
// Validate if sub-cluster is registered
rm.start();

View File

@ -48,13 +48,6 @@
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
@ -72,6 +65,37 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.router;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
@ -28,11 +29,19 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
import org.apache.hadoop.yarn.server.router.webapp.RouterWebApp;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* The router is a stateless YARN component which is the entry point to the
* cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with
@ -56,6 +65,9 @@ public class Router extends CompositeService {
private AtomicBoolean isStopping = new AtomicBoolean(false);
private RouterClientRMService clientRMProxyService;
private RouterRMAdminService rmAdminProxyService;
private WebApp webApp;
@VisibleForTesting
protected String webAppAddress;
/**
* Priority of the Router shutdown hook.
@ -79,6 +91,10 @@ public class Router extends CompositeService {
// RMAdmin Proxy
rmAdminProxyService = createRMAdminProxyService();
addService(rmAdminProxyService);
// WebService
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.ROUTER_BIND_HOST,
WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf));
super.serviceInit(conf);
}
@ -89,11 +105,15 @@ public class Router extends CompositeService {
} catch (IOException e) {
throw new YarnRuntimeException("Failed Router login", e);
}
startWepApp();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
if (isStopping.getAndSet(true)) {
return;
}
@ -117,6 +137,21 @@ public class Router extends CompositeService {
return new RouterRMAdminService();
}
@Private
public WebApp getWebapp() {
return this.webApp;
}
@VisibleForTesting
public void startWepApp() {
RMWebAppUtil.setupSecurityAndFilters(conf, null);
Builder<Object> builder =
WebApps.$for("cluster", null, null, "ws").with(conf).at(webAppAddress);
webApp = builder.start(new RouterWebApp(this));
}
public static void main(String[] argv) {
Configuration conf = new YarnConfiguration();
Thread

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import org.apache.hadoop.conf.Configuration;
/**
* Extends the RequestInterceptor class and provides common functionality which
* can be used and/or extended by other concrete intercepter classes.
*/
public abstract class AbstractRESTRequestInterceptor
implements RESTRequestInterceptor {
private Configuration conf;
private RESTRequestInterceptor nextInterceptor;
/**
* Sets the {@link RESTRequestInterceptor} in the chain.
*/
@Override
public void setNextInterceptor(RESTRequestInterceptor nextInterceptor) {
this.nextInterceptor = nextInterceptor;
}
/**
* Sets the {@link Configuration}.
*/
@Override
public void setConf(Configuration conf) {
this.conf = conf;
if (this.nextInterceptor != null) {
this.nextInterceptor.setConf(conf);
}
}
/**
* Gets the {@link Configuration}.
*/
@Override
public Configuration getConf() {
return this.conf;
}
/**
* Initializes the {@link RESTRequestInterceptor}.
*/
@Override
public void init(String user) {
if (this.nextInterceptor != null) {
this.nextInterceptor.init(user);
}
}
/**
* Disposes the {@link RESTRequestInterceptor}.
*/
@Override
public void shutdown() {
if (this.nextInterceptor != null) {
this.nextInterceptor.shutdown();
}
}
/**
* Gets the next {@link RESTRequestInterceptor} in the chain.
*/
@Override
public RESTRequestInterceptor getNextInterceptor() {
return this.nextInterceptor;
}
}

View File

@ -0,0 +1,496 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
/**
* Extends the AbstractRequestInterceptorClient class and provides an
* implementation that simply forwards the client requests to the resource
* manager.
*/
public final class DefaultRequestInterceptorREST
extends AbstractRESTRequestInterceptor {
private String webAppAddress;
@Override
public void init(String user) {
webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(getConf());
}
@Override
public ClusterInfo get() {
return getClusterInfo();
}
@Override
public ClusterInfo getClusterInfo() {
return RouterWebServiceUtil.genericForward(webAppAddress, null,
ClusterInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.INFO, null, null);
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
return RouterWebServiceUtil.genericForward(webAppAddress, null,
ClusterMetricsInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.METRICS, null, null);
}
@Override
public SchedulerTypeInfo getSchedulerInfo() {
return RouterWebServiceUtil.genericForward(webAppAddress, null,
SchedulerTypeInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER, null, null);
}
@Override
public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
throws IOException {
// time is specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, null,
String.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_LOGS, null, null);
}
@Override
public NodesInfo getNodes(String states) {
// states will be part of additionalParam
Map<String, String[]> additionalParam = new HashMap<String, String[]>();
additionalParam.put(RMWSConsts.STATES, new String[] {states});
return RouterWebServiceUtil.genericForward(webAppAddress, null,
NodesInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null,
additionalParam);
}
@Override
public NodeInfo getNode(String nodeId) {
return RouterWebServiceUtil.genericForward(webAppAddress, null,
NodeInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" + nodeId, null,
null);
}
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, Set<String> unselectedFields) {
// all the params are specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppsInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, null, null);
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
// nodeId is specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
ActivitiesInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_ACTIVITIES, null,
null);
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
// time and appId are specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppActivitiesInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_APP_ACTIVITIES,
null, null);
}
@Override
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
Set<String> stateQueries, Set<String> typeQueries) {
// stateQueries and typeQueries are specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
ApplicationStatisticsInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APP_STATISTICS, null, null);
}
@Override
public AppInfo getApp(HttpServletRequest hsr, String appId,
Set<String> unselectedFields) {
// unselectedFields is specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId, null,
null);
}
@Override
public AppState getAppState(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppState.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.STATE,
null, null);
}
@Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.PUT, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.STATE,
targetState, null);
}
@Override
public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
throws IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
NodeToLabelsInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.GET_NODE_TO_LABELS, null,
null);
}
@Override
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
throws IOException {
// labels will be part of additionalParam
Map<String, String[]> additionalParam = new HashMap<String, String[]>();
additionalParam.put(RMWSConsts.LABELS,
labels.toArray(new String[labels.size()]));
return RouterWebServiceUtil.genericForward(webAppAddress, null,
LabelsToNodesInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null,
additionalParam);
}
@Override
public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
HttpServletRequest hsr) throws IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REPLACE_NODE_TO_LABELS,
newNodeToLabels, null);
}
@Override
public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
HttpServletRequest hsr, String nodeId) throws Exception {
// newNodeLabelsName is specified inside hsr
return RouterWebServiceUtil
.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.NODES + "/" + nodeId + "/replace-labels",
null, null);
}
@Override
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
throws IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
NodeLabelsInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.GET_NODE_LABELS, null,
null);
}
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
HttpServletRequest hsr) throws Exception {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.ADD_NODE_LABELS,
newNodeLabels, null);
}
@Override
public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels,
HttpServletRequest hsr) throws Exception {
// oldNodeLabels is specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REMOVE_NODE_LABELS, null,
null);
}
@Override
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
throws IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
NodeLabelsInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.NODES + "/" + nodeId + "/get-labels",
null, null);
}
@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppPriority.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.PRIORITY,
null, null);
}
@Override
public Response updateApplicationPriority(AppPriority targetPriority,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.PUT, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.PRIORITY,
targetPriority, null);
}
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppQueue.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.QUEUE,
null, null);
}
@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.PUT, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.QUEUE,
targetQueue, null);
}
@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS_NEW_APPLICATION, null,
null);
}
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, newApp, null);
}
@Override
public Response postDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException, Exception {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.DELEGATION_TOKEN, tokenData,
null);
}
@Override
public Response postDelegationTokenExpiration(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.DELEGATION_TOKEN_EXPIRATION,
null, null);
}
@Override
public Response cancelDelegationToken(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.DELETE,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.DELEGATION_TOKEN, null,
null);
}
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_NEW, null,
null);
}
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_SUBMIT,
resContext, null);
}
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_UPDATE,
resContext, null);
}
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_DELETE,
resContext, null);
}
@Override
public Response listReservation(String queue, String reservationId,
long startTime, long endTime, boolean includeResourceAllocations,
HttpServletRequest hsr) throws Exception {
// queue, reservationId, startTime, endTime, includeResourceAllocations are
// specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_LIST, null,
null);
}
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
String type) throws AuthorizationException {
return RouterWebServiceUtil
.genericForward(webAppAddress, hsr, AppTimeoutInfo.class,
HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS
+ "/" + appId + "/" + RMWSConsts.TIMEOUTS + "/" + type,
null, null);
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppTimeoutsInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.TIMEOUTS,
null, null);
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Response.class, HTTPMethods.PUT, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.TIMEOUT,
appTimeout, null);
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppAttemptsInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH
+ RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.APPATTEMPTS,
null, null);
}
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
return RouterWebServiceUtil.genericForward(webAppAddress, req,
AppAttemptInfo.class,
HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/"
+ appId + "/" + RMWSConsts.APPATTEMPTS + "/" + appAttemptId,
null, null);
}
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
return RouterWebServiceUtil.genericForward(webAppAddress, req,
ContainersInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/"
+ RMWSConsts.APPATTEMPTS + "/" + appAttemptId + "/"
+ RMWSConsts.CONTAINERS,
null, null);
}
@Override
public ContainerInfo getContainer(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId,
String containerId) {
return RouterWebServiceUtil.genericForward(webAppAddress, req,
ContainerInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/"
+ RMWSConsts.APPATTEMPTS + "/" + appAttemptId + "/"
+ RMWSConsts.CONTAINERS + "/" + containerId,
null, null);
}
@Override
public void setNextInterceptor(RESTRequestInterceptor next) {
throw new YarnRuntimeException("setNextInterceptor is being called on "
+ "DefaultRequestInterceptorREST, which should be the last one "
+ "in the chain. Check if the interceptor pipeline configuration "
+ "is correct");
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
/**
* HTTP verbs.
**/
public enum HTTPMethods {
/* to retrieve resource representation/information */
GET,
/* to update existing resource */
PUT,
/* to delete resources */
DELETE,
/* to create new subordinate resources */
POST
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
/**
* Defines the contract to be implemented by the request intercepter classes,
* that can be used to intercept and inspect messages sent from the client to
* the resource manager server.
*
* This class includes 4 methods getAppAttempts, getAppAttempt, getContainers
* and getContainer that belong to {@link WebServices}. They are in this class
* to make sure that RouterWebServices implements the same REST methods of
* {@code RMWebServices}.
*/
public interface RESTRequestInterceptor
extends RMWebServiceProtocol, Configurable {
/**
* This method is called for initializing the intercepter. This is guaranteed
* to be called only once in the lifetime of this instance.
*
* @param user the name of the client
*/
void init(String user);
/**
* This method is called to release the resources held by the intercepter.
* This will be called when the application pipeline is being destroyed. The
* concrete implementations should dispose the resources and forward the
* request to the next intercepter, if any.
*/
void shutdown();
/**
* Sets the next intercepter in the pipeline. The concrete implementation of
* this interface should always pass the request to the nextInterceptor after
* inspecting the message. The last intercepter in the chain is responsible to
* send the messages to the resource manager service and so the last
* intercepter will not receive this method call.
*
* @param nextInterceptor the RESTRequestInterceptor to set in the pipeline
*/
void setNextInterceptor(RESTRequestInterceptor nextInterceptor);
/**
* Returns the next intercepter in the chain.
*
* @return the next intercepter in the chain
*/
RESTRequestInterceptor getNextInterceptor();
/**
*
* @see WebServices#getAppAttempt(HttpServletRequest, HttpServletResponse,
* String, String)
* @param req the servlet request
* @param res the servlet response
* @param appId the application we want to get the appAttempt. It is a
* PathParam.
* @param appAttemptId the AppAttempt we want to get the info. It is a
* PathParam.
* @return AppAttemptInfo of the specific AppAttempt
*/
AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId);
/**
*
* @see WebServices#getContainers(HttpServletRequest, HttpServletResponse,
* String, String)
* @param req the servlet request
* @param res the servlet response
* @param appId the application we want to get the containers info. It is a
* PathParam.
* @param appAttemptId the AppAttempt we want to get the info. It is a
* PathParam.
* @return ContainersInfo of all the containers that belong to the specific
* AppAttempt
*/
ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId);
/**
*
* @see WebServices#getContainer(HttpServletRequest, HttpServletResponse,
* String, String, String)
* @param req the servlet request
* @param res the servlet response
* @param appId the application we want to get the containers info. It is a
* PathParam.
* @param appAttemptId the AppAttempt we want to get the info. It is a
* PathParam.
* @param containerId the container we want to get the info. It is a
* PathParam.
* @return ContainerInfo of the specific ContainerId
*/
ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId, String containerId);
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
/**
* The Router webapp.
*/
public class RouterWebApp extends WebApp implements YarnWebParams {
private Router router;
public RouterWebApp(Router router) {
this.router = router;
}
@Override
public void setup() {
bind(JAXBContextResolver.class);
bind(RouterWebServices.class);
bind(GenericExceptionHandler.class);
bind(RouterWebApp.class).toInstance(this);
if (router != null) {
bind(Router.class).toInstance(router);
}
}
}

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import com.sun.jersey.api.ConflictException;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.core.util.MultivaluedMapImpl;
/**
* The Router webservice util class.
*/
public final class RouterWebServiceUtil {
private static String user = "YarnRouter";
private static final Log LOG =
LogFactory.getLog(RouterWebServiceUtil.class.getName());
/** Disable constructor. */
private RouterWebServiceUtil() {
}
/**
* Creates and performs a REST call to a specific WebService.
*
* @param webApp the address of the remote webap
* @param hsr the servlet request
* @param returnType the return type of the REST call
* @param <T> Type of return object.
* @param method the HTTP method of the REST call
* @param targetPath additional path to add to the webapp address
* @param formParam the form parameters as input for a specific REST call
* @param additionalParam the query parameters as input for a specific REST
* call in case the call has no servlet request
* @return the retrieved entity from the REST call
*/
protected static <T> T genericForward(String webApp, HttpServletRequest hsr,
final Class<T> returnType, HTTPMethods method, String targetPath,
Object formParam, Map<String, String[]> additionalParam) {
UserGroupInformation callerUGI = null;
if (hsr != null) {
callerUGI = RMWebAppUtil.getCallerUserGroupInformation(hsr, true);
} else {
// user not required
callerUGI = UserGroupInformation.createRemoteUser(user);
}
if (callerUGI == null) {
LOG.error("Unable to obtain user name, user not authenticated");
return null;
}
try {
return callerUGI.doAs(new PrivilegedExceptionAction<T>() {
@SuppressWarnings("unchecked")
@Override
public T run() {
Map<String, String[]> paramMap = null;
// We can have hsr or additionalParam. There are no case with both.
if (hsr != null) {
paramMap = hsr.getParameterMap();
} else if (additionalParam != null) {
paramMap = additionalParam;
}
ClientResponse response = RouterWebServiceUtil.invokeRMWebService(
webApp, targetPath, method,
(hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam);
if (Response.class.equals(returnType)) {
return (T) RouterWebServiceUtil.clientResponseToResponse(response);
}
// YARN RM can answer with Status.OK or it throws an exception
if (response.getStatus() == 200) {
return response.getEntity(returnType);
}
RouterWebServiceUtil.retrieveException(response);
return null;
}
});
} catch (InterruptedException e) {
return null;
} catch (IOException e) {
return null;
}
}
/**
* Performs an invocation of a REST call on a remote RMWebService.
*
* @param additionalParam
*/
private static ClientResponse invokeRMWebService(String webApp, String path,
HTTPMethods method, String additionalPath,
Map<String, String[]> queryParams, Object formParam) {
Client client = Client.create();
WebResource webResource = client.resource(webApp).path(path);
if (additionalPath != null && !additionalPath.isEmpty()) {
webResource = webResource.path(additionalPath);
}
if (queryParams != null && !queryParams.isEmpty()) {
MultivaluedMap<String, String> paramMap = new MultivaluedMapImpl();
for (Entry<String, String[]> param : queryParams.entrySet()) {
String[] values = param.getValue();
for (int i = 0; i < values.length; i++) {
paramMap.add(param.getKey(), values[i]);
}
}
webResource = webResource.queryParams(paramMap);
}
// I can forward the call in JSON or XML since the Router will convert it
// again in Object before send it back to the client
Builder builder = null;
if (formParam != null) {
builder = webResource.entity(formParam, MediaType.APPLICATION_XML);
builder = builder.accept(MediaType.APPLICATION_XML);
} else {
builder = webResource.accept(MediaType.APPLICATION_XML);
}
ClientResponse response = null;
switch (method) {
case DELETE:
response = builder.delete(ClientResponse.class);
break;
case GET:
response = builder.get(ClientResponse.class);
break;
case POST:
response = builder.post(ClientResponse.class);
break;
case PUT:
response = builder.put(ClientResponse.class);
break;
default:
break;
}
return response;
}
public static Response clientResponseToResponse(ClientResponse r) {
if (r == null) {
return null;
}
// copy the status code
ResponseBuilder rb = Response.status(r.getStatus());
// copy all the headers
for (Entry<String, List<String>> entry : r.getHeaders().entrySet()) {
for (String value : entry.getValue()) {
rb.header(entry.getKey(), value);
}
}
// copy the entity
rb.entity(r.getEntityInputStream());
// return the response
return rb.build();
}
public static void retrieveException(ClientResponse response) {
String serverErrorMsg = response.getEntity(String.class);
int status = response.getStatus();
if (status == 400) {
throw new BadRequestException(serverErrorMsg);
}
if (status == 403) {
throw new ForbiddenException(serverErrorMsg);
}
if (status == 404) {
throw new NotFoundException(serverErrorMsg);
}
if (status == 409) {
throw new ConflictException(serverErrorMsg);
}
}
}

View File

@ -0,0 +1,876 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
/**
* RouterWebServices is a service that runs on each router that can be used to
* intercept and inspect {@link RMWebServiceProtocol} messages from client to
* the cluster resource manager. It listens {@link RMWebServiceProtocol} REST
* messages from the client and creates a request intercepting pipeline instance
* for each client. The pipeline is a chain of {@link RESTRequestInterceptor}
* instances that can inspect and modify the request/response as needed. The
* main difference with AMRMProxyService is the protocol they implement.
**/
@Singleton
@Path("/ws/v1/cluster")
public class RouterWebServices implements RMWebServiceProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(RouterWebServices.class);
private final Router router;
private final Configuration conf;
private @Context HttpServletResponse response;
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
// -------Default values of QueryParams for RMWebServiceProtocol--------
public static final String DEFAULT_QUEUE = "default";
public static final String DEFAULT_RESERVATION_ID = "";
public static final String DEFAULT_START_TIME = "0";
public static final String DEFAULT_END_TIME = "-1";
public static final String DEFAULT_INCLUDE_RESOURCE = "false";
@Inject
public RouterWebServices(final Router router, Configuration conf) {
this.router = router;
this.conf = conf;
int maxCacheSize =
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
maxCacheSize, true));
}
/**
* Returns the comma separated intercepter class names from the configuration.
*
* @param conf
* @return the intercepter class names as an instance of ArrayList
*/
private List<String> getInterceptorClassNames(Configuration config) {
String configuredInterceptorClassNames =
config.get(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_INTERCEPTOR_CLASS);
List<String> interceptorClassNames = new ArrayList<String>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {
interceptorClassNames.add(item.trim());
}
return interceptorClassNames;
}
private void init() {
// clear content type
response.setContentType(null);
}
@VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain() {
String user = "";
try {
user = UserGroupInformation.getCurrentUser().getUserName();
} catch (IOException e) {
LOG.error("IOException " + e.getMessage());
}
if (!userPipelineMap.containsKey(user)) {
initializePipeline(user);
}
return userPipelineMap.get(user);
}
/**
* Gets the Request intercepter chains for all the users.
*
* @return the request intercepter chains.
*/
@VisibleForTesting
protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
return this.userPipelineMap;
}
/**
* This method creates and returns reference of the first intercepter in the
* chain of request intercepter instances.
*
* @return the reference of the first intercepter in the chain
*/
@VisibleForTesting
protected RESTRequestInterceptor createRequestInterceptorChain() {
List<String> interceptorClassNames = getInterceptorClassNames(conf);
RESTRequestInterceptor pipeline = null;
RESTRequestInterceptor current = null;
for (String interceptorClassName : interceptorClassNames) {
try {
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
if (RESTRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
RESTRequestInterceptor interceptorInstance =
(RESTRequestInterceptor) ReflectionUtils
.newInstance(interceptorClass, conf);
if (pipeline == null) {
pipeline = interceptorInstance;
current = interceptorInstance;
continue;
} else {
current.setNextInterceptor(interceptorInstance);
current = interceptorInstance;
}
} else {
throw new YarnRuntimeException(
"Class: " + interceptorClassName + " not instance of "
+ RESTRequestInterceptor.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate RESTRequestInterceptor: "
+ interceptorClassName,
e);
}
}
if (pipeline == null) {
throw new YarnRuntimeException(
"RequestInterceptor pipeline is not configured in the system");
}
return pipeline;
}
/**
* Initializes the request intercepter pipeline for the specified user.
*
* @param user
*/
private void initializePipeline(String user) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (this.userPipelineMap) {
if (this.userPipelineMap.containsKey(user)) {
LOG.info("Request to start an already existing user: {}"
+ " was received, so ignoring.", user);
return;
}
chainWrapper = new RequestInterceptorChainWrapper();
this.userPipelineMap.put(user, chainWrapper);
}
// We register the pipeline instance in the map first and then initialize it
// later because chain initialization can be expensive and we would like to
// release the lock as soon as possible to prevent other applications from
// blocking when one application's chain is initializing
LOG.info("Initializing request processing pipeline for the user: {}", user);
try {
RESTRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
synchronized (this.userPipelineMap) {
this.userPipelineMap.remove(user);
}
throw e;
}
}
/**
* Private structure for encapsulating RequestInterceptor and user instances.
*
*/
@Private
public static class RequestInterceptorChainWrapper {
private RESTRequestInterceptor rootInterceptor;
/**
* Initializes the wrapper with the specified parameters.
*
* @param interceptor the first interceptor in the pipeline
*/
public synchronized void init(RESTRequestInterceptor interceptor) {
this.rootInterceptor = interceptor;
}
/**
* Gets the root request intercepter.
*
* @return the root request intercepter
*/
public synchronized RESTRequestInterceptor getRootInterceptor() {
return rootInterceptor;
}
/**
* Shutdown the chain of interceptors when the object is destroyed.
*/
@Override
protected void finalize() {
rootInterceptor.shutdown();
}
}
@GET
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public ClusterInfo get() {
return getClusterInfo();
}
@GET
@Path(RMWSConsts.INFO)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public ClusterInfo getClusterInfo() {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getClusterInfo();
}
@GET
@Path(RMWSConsts.METRICS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getClusterMetricsInfo();
}
@GET
@Path(RMWSConsts.SCHEDULER)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public SchedulerTypeInfo getSchedulerInfo() {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getSchedulerInfo();
}
@POST
@Path(RMWSConsts.SCHEDULER_LOGS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time,
@Context HttpServletRequest hsr) throws IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr);
}
@GET
@Path(RMWSConsts.NODES)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getNodes(states);
}
@GET
@Path(RMWSConsts.NODES_NODEID)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getNode(nodeId);
}
@GET
@Path(RMWSConsts.APPS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppsInfo getApps(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.STATE) String stateQuery,
@QueryParam(RMWSConsts.STATES) Set<String> statesQuery,
@QueryParam(RMWSConsts.FINAL_STATUS) String finalStatusQuery,
@QueryParam(RMWSConsts.USER) String userQuery,
@QueryParam(RMWSConsts.QUEUE) String queueQuery,
@QueryParam(RMWSConsts.LIMIT) String count,
@QueryParam(RMWSConsts.STARTED_TIME_BEGIN) String startedBegin,
@QueryParam(RMWSConsts.STARTED_TIME_END) String startedEnd,
@QueryParam(RMWSConsts.FINISHED_TIME_BEGIN) String finishBegin,
@QueryParam(RMWSConsts.FINISHED_TIME_END) String finishEnd,
@QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> applicationTypes,
@QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags,
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery,
finalStatusQuery, userQuery, queueQuery, count, startedBegin,
startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags,
unselectedFields);
}
@GET
@Path(RMWSConsts.SCHEDULER_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.NODEID) String nodeId) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getActivities(hsr, nodeId);
}
@GET
@Path(RMWSConsts.SCHEDULER_APP_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APP_ID) String appId,
@QueryParam(RMWSConsts.MAX_TIME) String time) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time);
}
@GET
@Path(RMWSConsts.APP_STATISTICS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public ApplicationStatisticsInfo getAppStatistics(
@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.STATES) Set<String> stateQueries,
@QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> typeQueries) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries,
typeQueries);
}
@GET
@Path(RMWSConsts.APPS_APPID)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppInfo getApp(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId,
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields);
}
@GET
@Path(RMWSConsts.APPS_APPID_STATE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppState getAppState(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppState(hsr, appId);
}
@PUT
@Path(RMWSConsts.APPS_APPID_STATE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response updateAppState(AppState targetState,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().updateAppState(targetState, hsr,
appId);
}
@GET
@Path(RMWSConsts.GET_NODE_TO_LABELS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr)
throws IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getNodeToLabels(hsr);
}
@GET
@Path(RMWSConsts.LABEL_MAPPINGS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public LabelsToNodesInfo getLabelsToNodes(
@QueryParam(RMWSConsts.LABELS) Set<String> labels) throws IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getLabelsToNodes(labels);
}
@POST
@Path(RMWSConsts.REPLACE_NODE_TO_LABELS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response replaceLabelsOnNodes(
final NodeToLabelsEntryList newNodeToLabels,
@Context HttpServletRequest hsr) throws Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels,
hsr);
}
@POST
@Path(RMWSConsts.NODES_NODEID_REPLACE_LABELS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response replaceLabelsOnNode(
@QueryParam(RMWSConsts.LABELS) Set<String> newNodeLabelsName,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.NODEID) String nodeId) throws Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName,
hsr, nodeId);
}
@GET
@Path(RMWSConsts.GET_NODE_LABELS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr)
throws IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getClusterNodeLabels(hsr);
}
@POST
@Path(RMWSConsts.ADD_NODE_LABELS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
@Context HttpServletRequest hsr) throws Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels,
hsr);
}
@POST
@Path(RMWSConsts.REMOVE_NODE_LABELS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response removeFromCluserNodeLabels(
@QueryParam(RMWSConsts.LABELS) Set<String> oldNodeLabels,
@Context HttpServletRequest hsr) throws Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor()
.removeFromCluserNodeLabels(oldNodeLabels, hsr);
}
@GET
@Path(RMWSConsts.NODES_NODEID_GETLABELS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.NODEID) String nodeId) throws IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId);
}
@GET
@Path(RMWSConsts.APPS_APPID_PRIORITY)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppPriority getAppPriority(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppPriority(hsr, appId);
}
@PUT
@Path(RMWSConsts.APPS_APPID_PRIORITY)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response updateApplicationPriority(AppPriority targetPriority,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor()
.updateApplicationPriority(targetPriority, hsr, appId);
}
@GET
@Path(RMWSConsts.APPS_APPID_QUEUE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppQueue getAppQueue(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppQueue(hsr, appId);
}
@PUT
@Path(RMWSConsts.APPS_APPID_QUEUE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response updateAppQueue(AppQueue targetQueue,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr,
appId);
}
@POST
@Path(RMWSConsts.APPS_NEW_APPLICATION)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response createNewApplication(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().createNewApplication(hsr);
}
@POST
@Path(RMWSConsts.APPS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().submitApplication(newApp, hsr);
}
@POST
@Path(RMWSConsts.DELEGATION_TOKEN)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response postDelegationToken(DelegationToken tokenData,
@Context HttpServletRequest hsr) throws AuthorizationException,
IOException, InterruptedException, Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr);
}
@POST
@Path(RMWSConsts.DELEGATION_TOKEN_EXPIRATION)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr);
}
@DELETE
@Path(RMWSConsts.DELEGATION_TOKEN)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response cancelDelegationToken(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().cancelDelegationToken(hsr);
}
@POST
@Path(RMWSConsts.RESERVATION_NEW)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response createNewReservation(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().createNewReservation(hsr);
}
@POST
@Path(RMWSConsts.RESERVATION_SUBMIT)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().submitReservation(resContext, hsr);
}
@POST
@Path(RMWSConsts.RESERVATION_UPDATE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().updateReservation(resContext, hsr);
}
@POST
@Path(RMWSConsts.RESERVATION_DELETE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().deleteReservation(resContext, hsr);
}
@GET
@Path(RMWSConsts.RESERVATION_LIST)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response listReservation(
@QueryParam(RMWSConsts.QUEUE) @DefaultValue(DEFAULT_QUEUE) String queue,
@QueryParam(RMWSConsts.RESERVATION_ID) @DefaultValue(DEFAULT_RESERVATION_ID) String reservationId,
@QueryParam(RMWSConsts.START_TIME) @DefaultValue(DEFAULT_START_TIME) long startTime,
@QueryParam(RMWSConsts.END_TIME) @DefaultValue(DEFAULT_END_TIME) long endTime,
@QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations,
@Context HttpServletRequest hsr) throws Exception {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().listReservation(queue, reservationId,
startTime, endTime, includeResourceAllocations, hsr);
}
@GET
@Path(RMWSConsts.APPS_TIMEOUTS_TYPE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppTimeoutInfo getAppTimeout(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type);
}
@GET
@Path(RMWSConsts.APPS_TIMEOUTS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId);
}
@PUT
@Path(RMWSConsts.APPS_TIMEOUT)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout,
hsr, appId);
}
@GET
@Path(RMWSConsts.APPS_APPID_APPATTEMPTS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppAttempts(hsr, appId);
}
@GET
@Path(RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
public org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo getAppAttempt(
@Context HttpServletRequest req, @Context HttpServletResponse res,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getAppAttempt(req, res, appId,
appAttemptId);
}
@GET
@Path(RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID_CONTAINERS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
public ContainersInfo getContainers(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getContainers(req, res, appId,
appAttemptId);
}
@GET
@Path(RMWSConsts.GET_CONTAINER)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
public ContainerInfo getContainer(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId,
@PathParam(RMWSConsts.CONTAINERID) String containerId) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getContainer(req, res, appId,
appAttemptId, containerId);
}
@VisibleForTesting
protected void setResponse(HttpServletResponse response) {
this.response = response;
}
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Router WebApp package. **/
package org.apache.hadoop.yarn.server.router.webapp;

View File

@ -0,0 +1,601 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.Mockito;
/**
* Base class for all the RouterRMAdminService test cases. It provides utility
* methods that can be used by the concrete test case classes.
*
*/
public abstract class BaseRouterWebServicesTest {
private YarnConfiguration conf;
private Router router;
public final static int TEST_MAX_CACHE_SIZE = 10;
private RouterWebServices routerWebService;
@Before
public void setup() {
conf = new YarnConfiguration();
String mockPassThroughInterceptorClass =
PassThroughRESTRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
conf.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + mockPassThroughInterceptorClass + ","
+ MockRESTRequestInterceptor.class.getName());
conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE);
router = spy(new Router());
Mockito.doNothing().when(router).startWepApp();
routerWebService = new RouterWebServices(router, conf);
routerWebService.setResponse(mock(HttpServletResponse.class));
router.init(conf);
router.start();
}
@After
public void tearDown() {
if (router != null) {
router.stop();
}
}
protected RouterWebServices getRouterWebServices() {
Assert.assertNotNull(this.routerWebService);
return this.routerWebService;
}
protected ClusterInfo get(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ClusterInfo>() {
@Override
public ClusterInfo run() throws Exception {
return routerWebService.get();
}
});
}
protected ClusterInfo getClusterInfo(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ClusterInfo>() {
@Override
public ClusterInfo run() throws Exception {
return routerWebService.getClusterInfo();
}
});
}
protected ClusterMetricsInfo getClusterMetricsInfo(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ClusterMetricsInfo>() {
@Override
public ClusterMetricsInfo run() throws Exception {
return routerWebService.getClusterMetricsInfo();
}
});
}
protected SchedulerTypeInfo getSchedulerInfo(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<SchedulerTypeInfo>() {
@Override
public SchedulerTypeInfo run() throws Exception {
return routerWebService.getSchedulerInfo();
}
});
}
protected String dumpSchedulerLogs(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
return routerWebService.dumpSchedulerLogs(null, null);
}
});
}
protected NodesInfo getNodes(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<NodesInfo>() {
@Override
public NodesInfo run() throws Exception {
return routerWebService.getNodes(null);
}
});
}
protected NodeInfo getNode(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<NodeInfo>() {
@Override
public NodeInfo run() throws Exception {
return routerWebService.getNode(null);
}
});
}
protected AppsInfo getApps(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppsInfo>() {
@Override
public AppsInfo run() throws Exception {
return routerWebService.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
}
});
}
protected ActivitiesInfo getActivities(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ActivitiesInfo>() {
@Override
public ActivitiesInfo run() throws Exception {
return routerWebService.getActivities(null, null);
}
});
}
protected AppActivitiesInfo getAppActivities(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppActivitiesInfo>() {
@Override
public AppActivitiesInfo run() throws Exception {
return routerWebService.getAppActivities(null, null, null);
}
});
}
protected ApplicationStatisticsInfo getAppStatistics(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ApplicationStatisticsInfo>() {
@Override
public ApplicationStatisticsInfo run() throws Exception {
return routerWebService.getAppStatistics(null, null, null);
}
});
}
protected AppInfo getApp(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppInfo>() {
@Override
public AppInfo run() throws Exception {
return routerWebService.getApp(null, null, null);
}
});
}
protected AppState getAppState(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppState>() {
@Override
public AppState run() throws Exception {
return routerWebService.getAppState(null, null);
}
});
}
protected Response updateAppState(String user) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.updateAppState(null, null, null);
}
});
}
protected NodeToLabelsInfo getNodeToLabels(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<NodeToLabelsInfo>() {
@Override
public NodeToLabelsInfo run() throws Exception {
return routerWebService.getNodeToLabels(null);
}
});
}
protected LabelsToNodesInfo getLabelsToNodes(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<LabelsToNodesInfo>() {
@Override
public LabelsToNodesInfo run() throws Exception {
return routerWebService.getLabelsToNodes(null);
}
});
}
protected Response replaceLabelsOnNodes(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.replaceLabelsOnNodes(null, null);
}
});
}
protected Response replaceLabelsOnNode(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.replaceLabelsOnNode(null, null, null);
}
});
}
protected NodeLabelsInfo getClusterNodeLabels(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() {
@Override
public NodeLabelsInfo run() throws Exception {
return routerWebService.getClusterNodeLabels(null);
}
});
}
protected Response addToClusterNodeLabels(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.addToClusterNodeLabels(null, null);
}
});
}
protected Response removeFromCluserNodeLabels(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.removeFromCluserNodeLabels(null, null);
}
});
}
protected NodeLabelsInfo getLabelsOnNode(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() {
@Override
public NodeLabelsInfo run() throws Exception {
return routerWebService.getLabelsOnNode(null, null);
}
});
}
protected AppPriority getAppPriority(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppPriority>() {
@Override
public AppPriority run() throws Exception {
return routerWebService.getAppPriority(null, null);
}
});
}
protected Response updateApplicationPriority(String user)
throws AuthorizationException, YarnException, InterruptedException,
IOException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.updateApplicationPriority(null, null, null);
}
});
}
protected AppQueue getAppQueue(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppQueue>() {
@Override
public AppQueue run() throws Exception {
return routerWebService.getAppQueue(null, null);
}
});
}
protected Response updateAppQueue(String user) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.updateAppQueue(null, null, null);
}
});
}
protected Response createNewApplication(String user)
throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.createNewApplication(null);
}
});
}
protected Response submitApplication(String user)
throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.submitApplication(null, null);
}
});
}
protected Response postDelegationToken(String user)
throws AuthorizationException, IOException, InterruptedException,
Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.postDelegationToken(null, null);
}
});
}
protected Response postDelegationTokenExpiration(String user)
throws AuthorizationException, IOException, Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.postDelegationTokenExpiration(null);
}
});
}
protected Response cancelDelegationToken(String user)
throws AuthorizationException, IOException, InterruptedException,
Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.cancelDelegationToken(null);
}
});
}
protected Response createNewReservation(String user)
throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.createNewReservation(null);
}
});
}
protected Response submitReservation(String user)
throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.submitReservation(null, null);
}
});
}
protected Response updateReservation(String user)
throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.updateReservation(null, null);
}
});
}
protected Response deleteReservation(String user)
throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.deleteReservation(null, null);
}
});
}
protected Response listReservation(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.listReservation(null, null, 0, 0, false,
null);
}
});
}
protected AppTimeoutInfo getAppTimeout(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppTimeoutInfo>() {
@Override
public AppTimeoutInfo run() throws Exception {
return routerWebService.getAppTimeout(null, null, null);
}
});
}
protected AppTimeoutsInfo getAppTimeouts(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppTimeoutsInfo>() {
@Override
public AppTimeoutsInfo run() throws Exception {
return routerWebService.getAppTimeouts(null, null);
}
});
}
protected Response updateApplicationTimeout(String user)
throws AuthorizationException, YarnException, InterruptedException,
IOException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws Exception {
return routerWebService.updateApplicationTimeout(null, null, null);
}
});
}
protected AppAttemptsInfo getAppAttempts(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppAttemptsInfo>() {
@Override
public AppAttemptsInfo run() throws Exception {
return routerWebService.getAppAttempts(null, null);
}
});
}
protected AppAttemptInfo getAppAttempt(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AppAttemptInfo>() {
@Override
public AppAttemptInfo run() throws Exception {
return routerWebService.getAppAttempt(null, null, null, null);
}
});
}
protected ContainersInfo getContainers(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ContainersInfo>() {
@Override
public ContainersInfo run() throws Exception {
return routerWebService.getContainers(null, null, null, null);
}
});
}
protected ContainerInfo getContainer(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ContainerInfo>() {
@Override
public ContainerInfo run() throws Exception {
return routerWebService.getContainer(null, null, null, null, null);
}
});
}
protected RequestInterceptorChainWrapper getInterceptorChain(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<RequestInterceptorChainWrapper>() {
@Override
public RequestInterceptorChainWrapper run() throws Exception {
return routerWebService.getInterceptorChain();
}
});
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.File;
import java.io.IOException;
/**
* Helper class to start a new process.
*/
public class JavaProcess {
private Process process = null;
public JavaProcess(Class<?> klass) throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin =
javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
classpath = classpath.concat("./src/test/resources");
String className = klass.getCanonicalName();
ProcessBuilder builder =
new ProcessBuilder(javaBin, "-cp", classpath, className);
builder.inheritIO();
process = builder.start();
}
public void stop() throws InterruptedException {
if (process != null) {
process.destroy();
process.waitFor();
process.exitValue();
}
}
}

View File

@ -0,0 +1,340 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
/**
* This class mocks the RESTRequestInterceptor.
*/
public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
@Override
public void setNextInterceptor(RESTRequestInterceptor next) {
throw new YarnRuntimeException(
"setNextInterceptor is being called on MockRESTRequestInterceptor,"
+ "which should be the last one in the chain. "
+ "Check if the interceptor pipeline configuration is correct");
}
@Override
public ClusterInfo get() {
return new ClusterInfo();
}
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo();
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
return new ClusterMetricsInfo();
}
@Override
public SchedulerTypeInfo getSchedulerInfo() {
return new SchedulerTypeInfo();
}
@Override
public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
throws IOException {
return "Done";
}
@Override
public NodesInfo getNodes(String states) {
return new NodesInfo();
}
@Override
public NodeInfo getNode(String nodeId) {
return new NodeInfo();
}
@SuppressWarnings("checkstyle:parameternumber")
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, Set<String> unselectedFields) {
return new AppsInfo();
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
return new ActivitiesInfo();
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
return new AppActivitiesInfo();
}
@Override
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
Set<String> stateQueries, Set<String> typeQueries) {
return new ApplicationStatisticsInfo();
}
@Override
public AppInfo getApp(HttpServletRequest hsr, String appId,
Set<String> unselectedFields) {
return new AppInfo();
}
@Override
public AppState getAppState(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return new AppState();
}
@Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
return Response.status(Status.OK).build();
}
@Override
public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
throws IOException {
return new NodeToLabelsInfo();
}
@Override
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
throws IOException {
return new LabelsToNodesInfo();
}
@Override
public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
HttpServletRequest hsr) throws Exception {
return Response.status(Status.OK).build();
}
@Override
public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
HttpServletRequest hsr, String nodeId) throws Exception {
return Response.status(Status.OK).build();
}
@Override
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
throws IOException {
return new NodeLabelsInfo();
}
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
HttpServletRequest hsr) throws Exception {
return Response.status(Status.OK).build();
}
@Override
public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels,
HttpServletRequest hsr) throws Exception {
return Response.status(Status.OK).build();
}
@Override
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
throws IOException {
return new NodeLabelsInfo();
}
@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return new AppPriority();
}
@Override
public Response updateApplicationPriority(AppPriority targetPriority,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return Response.status(Status.OK).build();
}
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return new AppQueue();
}
@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
return Response.status(Status.OK).build();
}
@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return Response.status(Status.OK).build();
}
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return Response.status(Status.OK).build();
}
@Override
public Response postDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException, Exception {
return Response.status(Status.OK).build();
}
@Override
public Response postDelegationTokenExpiration(HttpServletRequest hsr)
throws AuthorizationException, IOException, Exception {
return Response.status(Status.OK).build();
}
@Override
public Response cancelDelegationToken(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
return Response.status(Status.OK).build();
}
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return Response.status(Status.OK).build();
}
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return Response.status(Status.OK).build();
}
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return Response.status(Status.OK).build();
}
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return Response.status(Status.OK).build();
}
@Override
public Response listReservation(String queue, String reservationId,
long startTime, long endTime, boolean includeResourceAllocations,
HttpServletRequest hsr) throws Exception {
return Response.status(Status.OK).build();
}
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
String type) throws AuthorizationException {
return new AppTimeoutInfo();
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return new AppTimeoutsInfo();
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return Response.status(Status.OK).build();
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
return new AppAttemptsInfo();
}
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
return new AppAttemptInfo();
}
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
return new ContainersInfo();
}
@Override
public ContainerInfo getContainer(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId,
String containerId) {
return new ContainerInfo();
}
}

View File

@ -0,0 +1,339 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
/**
* Mock intercepter that does not do anything other than forwarding it to the
* next intercepter in the chain.
*/
public class PassThroughRESTRequestInterceptor
extends AbstractRESTRequestInterceptor {
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
return getNextInterceptor().getAppAttempts(hsr, appId);
}
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
return getNextInterceptor().getAppAttempt(req, res, appId, appAttemptId);
}
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
return getNextInterceptor().getContainers(req, res, appId, appAttemptId);
}
@Override
public ContainerInfo getContainer(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId,
String containerId) {
return getNextInterceptor().getContainer(req, res, appId, appAttemptId,
containerId);
}
@Override
public ClusterInfo get() {
return getNextInterceptor().get();
}
@Override
public ClusterInfo getClusterInfo() {
return getNextInterceptor().getClusterInfo();
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
return getNextInterceptor().getClusterMetricsInfo();
}
@Override
public SchedulerTypeInfo getSchedulerInfo() {
return getNextInterceptor().getSchedulerInfo();
}
@Override
public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
throws IOException {
return getNextInterceptor().dumpSchedulerLogs(time, hsr);
}
@Override
public NodesInfo getNodes(String states) {
return getNextInterceptor().getNodes(states);
}
@Override
public NodeInfo getNode(String nodeId) {
return getNextInterceptor().getNode(nodeId);
}
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, Set<String> unselectedFields) {
return getNextInterceptor().getApps(hsr, stateQuery, statesQuery,
finalStatusQuery, userQuery, queueQuery, count, startedBegin,
startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags,
unselectedFields);
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
return getNextInterceptor().getActivities(hsr, nodeId);
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
return getNextInterceptor().getAppActivities(hsr, appId, time);
}
@Override
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
Set<String> stateQueries, Set<String> typeQueries) {
return getNextInterceptor().getAppStatistics(hsr, stateQueries,
typeQueries);
}
@Override
public AppInfo getApp(HttpServletRequest hsr, String appId,
Set<String> unselectedFields) {
return getNextInterceptor().getApp(hsr, appId, unselectedFields);
}
@Override
public AppState getAppState(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return getNextInterceptor().getAppState(hsr, appId);
}
@Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
return getNextInterceptor().updateAppState(targetState, hsr, appId);
}
@Override
public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
throws IOException {
return getNextInterceptor().getNodeToLabels(hsr);
}
@Override
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
throws IOException {
return getNextInterceptor().getLabelsToNodes(labels);
}
@Override
public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
HttpServletRequest hsr) throws Exception {
return getNextInterceptor().replaceLabelsOnNodes(newNodeToLabels, hsr);
}
@Override
public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
HttpServletRequest hsr, String nodeId) throws Exception {
return getNextInterceptor().replaceLabelsOnNode(newNodeLabelsName, hsr,
nodeId);
}
@Override
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
throws IOException {
return getNextInterceptor().getClusterNodeLabels(hsr);
}
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
HttpServletRequest hsr) throws Exception {
return getNextInterceptor().addToClusterNodeLabels(newNodeLabels, hsr);
}
@Override
public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels,
HttpServletRequest hsr) throws Exception {
return getNextInterceptor().removeFromCluserNodeLabels(oldNodeLabels, hsr);
}
@Override
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
throws IOException {
return getNextInterceptor().getLabelsOnNode(hsr, nodeId);
}
@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return getNextInterceptor().getAppPriority(hsr, appId);
}
@Override
public Response updateApplicationPriority(AppPriority targetPriority,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return getNextInterceptor().updateApplicationPriority(targetPriority, hsr,
appId);
}
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return getNextInterceptor().getAppQueue(hsr, appId);
}
@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
return getNextInterceptor().updateAppQueue(targetQueue, hsr, appId);
}
@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return getNextInterceptor().createNewApplication(hsr);
}
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return getNextInterceptor().submitApplication(newApp, hsr);
}
@Override
public Response postDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException, Exception {
return getNextInterceptor().postDelegationToken(tokenData, hsr);
}
@Override
public Response postDelegationTokenExpiration(HttpServletRequest hsr)
throws AuthorizationException, IOException, Exception {
return getNextInterceptor().postDelegationTokenExpiration(hsr);
}
@Override
public Response cancelDelegationToken(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
return getNextInterceptor().cancelDelegationToken(hsr);
}
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return getNextInterceptor().createNewReservation(hsr);
}
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return getNextInterceptor().submitReservation(resContext, hsr);
}
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return getNextInterceptor().updateReservation(resContext, hsr);
}
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
return getNextInterceptor().deleteReservation(resContext, hsr);
}
@Override
public Response listReservation(String queue, String reservationId,
long startTime, long endTime, boolean includeResourceAllocations,
HttpServletRequest hsr) throws Exception {
return getNextInterceptor().listReservation(queue, reservationId, startTime,
endTime, includeResourceAllocations, hsr);
}
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
String type) throws AuthorizationException {
return getNextInterceptor().getAppTimeout(hsr, appId, type);
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
return getNextInterceptor().getAppTimeouts(hsr, appId);
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
return getNextInterceptor().updateApplicationTimeout(appTimeout, hsr,
appId);
}
}

View File

@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.junit.Assert;
import org.junit.Test;
/**
* Test class to validate the WebService interceptor model inside the Router.
*/
public class TestRouterWebServices extends BaseRouterWebServicesTest {
private String user = "test1";
/**
* Test that all requests get forwarded to the last interceptor in the chain
* get back the responses.
*/
@Test
public void testRouterWebServicesE2E() throws Exception {
ClusterInfo clusterInfo = get(user);
Assert.assertNotNull(clusterInfo);
ClusterInfo clusterInfo2 = getClusterInfo(user);
Assert.assertNotNull(clusterInfo2);
ClusterMetricsInfo clusterMetricsInfo = getClusterMetricsInfo(user);
Assert.assertNotNull(clusterMetricsInfo);
SchedulerTypeInfo schedulerTypeInfo = getSchedulerInfo(user);
Assert.assertNotNull(schedulerTypeInfo);
String dumpResult = dumpSchedulerLogs(user);
Assert.assertNotNull(dumpResult);
NodesInfo nodesInfo = getNodes(user);
Assert.assertNotNull(nodesInfo);
NodeInfo nodeInfo = getNode(user);
Assert.assertNotNull(nodeInfo);
AppsInfo appsInfo = getApps(user);
Assert.assertNotNull(appsInfo);
ActivitiesInfo activitiesInfo = getActivities(user);
Assert.assertNotNull(activitiesInfo);
AppActivitiesInfo appActiviesInfo = getAppActivities(user);
Assert.assertNotNull(appActiviesInfo);
ApplicationStatisticsInfo applicationStatisticsInfo =
getAppStatistics(user);
Assert.assertNotNull(applicationStatisticsInfo);
AppInfo appInfo = getApp(user);
Assert.assertNotNull(appInfo);
AppState appState = getAppState(user);
Assert.assertNotNull(appState);
Response response = updateAppState(user);
Assert.assertNotNull(response);
NodeToLabelsInfo nodeToLabelsInfo = getNodeToLabels(user);
Assert.assertNotNull(nodeToLabelsInfo);
LabelsToNodesInfo labelsToNodesInfo = getLabelsToNodes(user);
Assert.assertNotNull(labelsToNodesInfo);
Response response2 = replaceLabelsOnNodes(user);
Assert.assertNotNull(response2);
Response response3 = replaceLabelsOnNode(user);
Assert.assertNotNull(response3);
NodeLabelsInfo nodeLabelsInfo = getClusterNodeLabels(user);
Assert.assertNotNull(nodeLabelsInfo);
Response response4 = addToClusterNodeLabels(user);
Assert.assertNotNull(response4);
Response response5 = removeFromCluserNodeLabels(user);
Assert.assertNotNull(response5);
NodeLabelsInfo nodeLabelsInfo2 = getLabelsOnNode(user);
Assert.assertNotNull(nodeLabelsInfo2);
AppPriority appPriority = getAppPriority(user);
Assert.assertNotNull(appPriority);
Response response6 = updateApplicationPriority(user);
Assert.assertNotNull(response6);
AppQueue appQueue = getAppQueue(user);
Assert.assertNotNull(appQueue);
Response response7 = updateAppQueue(user);
Assert.assertNotNull(response7);
Response response8 = createNewApplication(user);
Assert.assertNotNull(response8);
Response response9 = submitApplication(user);
Assert.assertNotNull(response9);
Response response10 = postDelegationToken(user);
Assert.assertNotNull(response10);
Response response11 = postDelegationTokenExpiration(user);
Assert.assertNotNull(response11);
Response response12 = cancelDelegationToken(user);
Assert.assertNotNull(response12);
Response response13 = createNewReservation(user);
Assert.assertNotNull(response13);
Response response14 = submitReservation(user);
Assert.assertNotNull(response14);
Response response15 = updateReservation(user);
Assert.assertNotNull(response15);
Response response16 = deleteReservation(user);
Assert.assertNotNull(response16);
Response response17 = listReservation(user);
Assert.assertNotNull(response17);
AppTimeoutInfo appTimeoutInfo = getAppTimeout(user);
Assert.assertNotNull(appTimeoutInfo);
AppTimeoutsInfo appTimeoutsInfo = getAppTimeouts(user);
Assert.assertNotNull(appTimeoutsInfo);
Response response18 = updateApplicationTimeout(user);
Assert.assertNotNull(response18);
AppAttemptsInfo appAttemptsInfo = getAppAttempts(user);
Assert.assertNotNull(appAttemptsInfo);
AppAttemptInfo appAttemptInfo = getAppAttempt(user);
Assert.assertNotNull(appAttemptInfo);
ContainersInfo containersInfo = getContainers(user);
Assert.assertNotNull(containersInfo);
ContainerInfo containerInfo = getContainer(user);
Assert.assertNotNull(containerInfo);
}
/**
* Tests if the pipeline is created properly.
*/
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RESTRequestInterceptor root =
super.getRouterWebServices().createRequestInterceptorChain();
int index = 0;
while (root != null) {
// The current pipeline is:
// PassThroughRESTRequestInterceptor - index = 0
// PassThroughRESTRequestInterceptor - index = 1
// PassThroughRESTRequestInterceptor - index = 2
// MockRESTRequestInterceptor - index = 3
switch (index) {
case 0: // Fall to the next case
case 1: // Fall to the next case
case 2:
// If index is equal to 0,1 or 2 we fall in this check
Assert.assertEquals(PassThroughRESTRequestInterceptor.class.getName(),
root.getClass().getName());
break;
case 3:
Assert.assertEquals(MockRESTRequestInterceptor.class.getName(),
root.getClass().getName());
break;
default:
Assert.fail();
}
root = root.getNextInterceptor();
index++;
}
Assert.assertEquals("The number of interceptors in chain does not match", 4,
index);
}
/**
* Test if the different chains for users are generated, and LRU cache is
* working as expected.
*/
@Test
public void testUsersChainMapWithLRUCache()
throws YarnException, IOException, InterruptedException {
getInterceptorChain("test1");
getInterceptorChain("test2");
getInterceptorChain("test3");
getInterceptorChain("test4");
getInterceptorChain("test5");
getInterceptorChain("test6");
getInterceptorChain("test7");
getInterceptorChain("test8");
Map<String, RequestInterceptorChainWrapper> pipelines =
getRouterWebServices().getPipelines();
Assert.assertEquals(8, pipelines.size());
getInterceptorChain("test9");
getInterceptorChain("test10");
getInterceptorChain("test1");
getInterceptorChain("test11");
// The cache max size is defined in TEST_MAX_CACHE_SIZE
Assert.assertEquals(10, pipelines.size());
RequestInterceptorChainWrapper chain = pipelines.get("test1");
Assert.assertNotNull("test1 should not be evicted", chain);
chain = pipelines.get("test2");
Assert.assertNull("test2 should have been evicted", chain);
}
}

View File

@ -0,0 +1,111 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>yarn.scheduler.capacity.maximum-applications</name>
<value>10000</value>
<description>
Maximum number of applications that can be pending and running.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>0.1</value>
<description>
Maximum percent of resources in the cluster which can be used to run
application masters i.e. controls number of concurrent running
applications.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
<description>
The ResourceCalculator implementation to be used to compare
Resources in the scheduler.
The default i.e. DefaultResourceCalculator only uses Memory while
DominantResourceCalculator uses dominant-resource to compare
multi-dimensional resources such as Memory, CPU etc.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default</value>
<description>
The queues at the this level (root is the root queue).
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>100</value>
<description>Default queue target capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
<value>1</value>
<description>
Default queue user limit a percentage from 0.0 to 1.0.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>100</value>
<description>
The maximum capacity of the default queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.state</name>
<value>RUNNING</value>
<description>
The state of the default queue. State can be one of RUNNING or STOPPED.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
<value>*</value>
<description>
The ACL of who can submit jobs to the default queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
<value>*</value>
<description>
The ACL of who can administer jobs on the default queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.node-locality-delay</name>
<value>-1</value>
<description>
Number of missed scheduling opportunities after which the CapacityScheduler
attempts to schedule rack-local containers.
Typically this should be set to number of racks in the cluster, this
feature is disabled by default, set to -1.
</description>
</property>
</configuration>

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout
log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>yarn.resourcemanager.reservation-system.enable</name>
<value>true</value>
</property>
<property>
<name>yarn.node-labels.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>0.0.0.0:8080</value>
</property>
</configuration>