YARN-2427. Added the API of moving apps between queues in RM web services. Contributed by Varun Vasudev.

This commit is contained in:
Zhijie Shen 2015-01-06 14:32:09 -08:00
parent dd57c2047b
commit 60103fca04
7 changed files with 491 additions and 20 deletions

View File

@ -73,6 +73,9 @@ Release 2.7.0 - UNRELEASED
YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler.
(Anubhav Dhoot via kasha)
YARN-2427. Added the API of moving apps between queues in RM web services.
(Varun Vasudev via zjshen)
IMPROVEMENTS
YARN-2950. Change message to mandate, not suggest JS requirement on UI.

View File

@ -58,7 +58,7 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
final Class[] rootUnwrappedTypes =
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
ContainerLaunchContextInfo.class, LocalResourceInfo.class,
DelegationToken.class };
DelegationToken.class, AppQueue.class };
this.typesContextMap = new HashMap<Class, JAXBContext>();
context =

View File

@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -117,6 +118,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
@ -696,7 +698,7 @@ public class RMWebServices {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService", "Trying to kill/move an absent application "
"UNKNOWN", "RMWebService", "Trying to kill an absent application "
+ appId);
throw e;
}
@ -945,6 +947,126 @@ public class RMWebServices {
return Response.status(Status.OK).entity(ret).build();
}
@GET
@Path("/apps/{appid}/queue")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AppQueue getAppQueue(@Context HttpServletRequest hsr,
@PathParam("appid") String appId) throws AuthorizationException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
String userName = "UNKNOWN-USER";
if (callerUGI != null) {
userName = callerUGI.getUserName();
}
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService",
"Trying to get state of an absent application " + appId);
throw e;
}
AppQueue ret = new AppQueue();
ret.setQueue(app.getQueue());
return ret;
}
@PUT
@Path("/apps/{appid}/queue")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response updateAppQueue(AppQueue targetQueue,
@Context HttpServletRequest hsr, @PathParam("appid") String appId)
throws AuthorizationException, YarnException, InterruptedException,
IOException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
String msg = "Unable to obtain user name, user not authenticated";
throw new AuthorizationException(msg);
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
return Response.status(Status.FORBIDDEN).entity(msg).build();
}
String userName = callerUGI.getUserName();
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService", "Trying to move an absent application "
+ appId);
throw e;
}
if (!app.getQueue().equals(targetQueue.getQueue())) {
// user is attempting to change queue.
return moveApp(app, callerUGI, targetQueue.getQueue());
}
AppQueue ret = new AppQueue();
ret.setQueue(app.getQueue());
return Response.status(Status.OK).entity(ret).build();
}
protected Response moveApp(RMApp app, UserGroupInformation callerUGI,
String targetQueue) throws IOException, InterruptedException {
if (app == null) {
throw new IllegalArgumentException("app cannot be null");
}
String userName = callerUGI.getUserName();
final ApplicationId appid = app.getApplicationId();
final String reqTargetQueue = targetQueue;
try {
callerUGI
.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException,
YarnException {
MoveApplicationAcrossQueuesRequest req =
MoveApplicationAcrossQueuesRequest.newInstance(appid,
reqTargetQueue);
rm.getClientRMService().moveApplicationAcrossQueues(req);
return null;
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String appId = app.getApplicationId().toString();
String msg =
"Unauthorized attempt to move appid " + appId
+ " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else if (ye.getMessage().startsWith("App in")
&& ye.getMessage().endsWith("state cannot be moved.")) {
return Response.status(Status.BAD_REQUEST).entity(ye.getMessage())
.build();
} else {
throw ue;
}
} else {
throw ue;
}
}
AppQueue ret = new AppQueue();
ret.setQueue(app.getQueue());
return Response.status(Status.OK).entity(ret).build();
}
private RMApp getRMAppForAppId(String appId) {
if (appId == null || appId.isEmpty()) {

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "appqueue")
@XmlAccessorType(XmlAccessType.FIELD)
public class AppQueue {
String queue;
public AppQueue() {
}
public AppQueue(String queue) {
this.queue = queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getQueue() {
return this.queue;
}
}

View File

@ -130,6 +130,7 @@ public class TestFifoScheduler {
Assert.fail("NPE when allocating container on node but "
+ "forget to set off-switch request should be handled");
}
rm.stop();
}
@Test

View File

@ -43,7 +43,6 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
@ -72,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -199,6 +200,9 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
out.println(" <queue name=\"default\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println(" <queue name=\"test\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
@ -358,7 +362,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
new AppState(YarnApplicationState.KILLED.toString());
Object entity;
if (contentType == MediaType.APPLICATION_JSON_TYPE) {
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
@ -439,7 +443,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
ClientResponse response;
AppState targetState = new AppState(targetStateString);
Object entity;
if (contentType == MediaType.APPLICATION_JSON_TYPE) {
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
@ -555,7 +559,6 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
validateResponseStatus(response, Status.FORBIDDEN);
}
rm.stop();
}
@Test
@ -736,20 +739,19 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
String appType = "test-type";
String urlPath = "apps";
String appId = testGetNewApplication(acceptMedia);
List<String> commands = new ArrayList<String>();
List<String> commands = new ArrayList<>();
commands.add("/bin/sleep 5");
HashMap<String, String> environment = new HashMap<String, String>();
HashMap<String, String> environment = new HashMap<>();
environment.put("APP_VAR", "ENV_SETTING");
HashMap<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
HashMap<ApplicationAccessType, String> acls = new HashMap<>();
acls.put(ApplicationAccessType.MODIFY_APP, "testuser1, testuser2");
acls.put(ApplicationAccessType.VIEW_APP, "testuser3, testuser4");
Set<String> tags = new HashSet<String>();
Set<String> tags = new HashSet<>();
tags.add("tag1");
tags.add("tag 2");
CredentialsInfo credentials = new CredentialsInfo();
HashMap<String, String> tokens = new HashMap<String, String>();
HashMap<String, String> secrets = new HashMap<String, String>();
HashMap<String, String> tokens = new HashMap<>();
HashMap<String, String> secrets = new HashMap<>();
secrets.put("secret1", Base64.encodeBase64String(
"mysecret".getBytes("UTF8")));
credentials.setSecrets(secrets);
@ -761,8 +763,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
appInfo.setMaxAppAttempts(2);
appInfo.setQueue(queueName);
appInfo.setApplicationType(appType);
HashMap<String, LocalResourceInfo> lr =
new HashMap<String, LocalResourceInfo>();
HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
@ -911,8 +912,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
appInfo.setMaxAppAttempts(2);
appInfo.setQueue("testqueue");
appInfo.setApplicationType("test-type");
HashMap<String, LocalResourceInfo> lr =
new HashMap<String, LocalResourceInfo>();
HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
@ -939,4 +939,150 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
rm.stop();
}
@Test
public void testGetAppQueue() throws Exception {
client().addFilter(new LoggingFilter(System.out));
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] contentTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"queue").accept(contentType).get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
String expectedQueue = "default";
if(!isCapacityScheduler) {
expectedQueue = "root." + webserviceUserName;
}
if (contentType.equals(MediaType.APPLICATION_JSON)) {
verifyAppQueueJson(response, expectedQueue);
} else {
verifyAppQueueXML(response, expectedQueue);
}
}
rm.stop();
}
@Test(timeout = 90000)
public void testAppMove() throws Exception {
client().addFilter(new LoggingFilter(System.out));
boolean isCapacityScheduler =
rm.getResourceScheduler() instanceof CapacityScheduler;
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
String[] queues = { "default", "test" };
csconf.setQueues("root", queues);
csconf.setCapacity("root.default", 50.0f);
csconf.setCapacity("root.test", 50.0f);
csconf.setAcl("root", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.test", QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
AppQueue targetQueue = new AppQueue("test");
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appQueueToJSON(targetQueue);
} else {
entity = targetQueue;
}
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"queue").entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.OK, response.getClientResponseStatus());
String expectedQueue = "test";
if(!isCapacityScheduler) {
expectedQueue = "root.test";
}
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppQueueJson(response, expectedQueue);
} else {
verifyAppQueueXML(response, expectedQueue);
}
Assert.assertEquals(expectedQueue, app.getQueue());
// check unauthorized
app = rm.submitApp(CONTAINER_MB, "", "someuser");
amNodeManager.nodeHeartbeat(true);
response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"queue").entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
assertEquals(Status.FORBIDDEN, response.getClientResponseStatus());
if(isCapacityScheduler) {
Assert.assertEquals("default", app.getQueue());
}
else {
Assert.assertEquals("root.someuser", app.getQueue());
}
}
}
rm.stop();
}
protected static String appQueueToJSON(AppQueue targetQueue) throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(AppQueue.class);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(targetQueue, sw);
return sw.toString();
}
protected static void
verifyAppQueueJson(ClientResponse response, String queue)
throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
String responseQueue = json.getString("queue");
assertEquals(queue, responseQueue);
}
protected static void
verifyAppQueueXML(ClientResponse response, String queue)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("appqueue");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String responseQueue = WebServicesTestUtils.getXmlString(element, "queue");
assertEquals(queue, responseQueue);
}
}

View File

@ -2595,7 +2595,7 @@ Server: Jetty(6.1.26)
+---+
HTTP/1.1 202 Accepted
Content-Type: application/json
Content-Type: application/xml
Content-Length: 794
Location: http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003
Server: Jetty(6.1.26)
@ -2664,8 +2664,6 @@ Server: Jetty(6.1.26)
+---+
HTTP/1.1 403 Unauthorized
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
+---+
@ -2708,6 +2706,161 @@ Server: Jetty(6.1.26)
+---+
* Cluster Application Queue API
With the application queue API, you can query the queue of a submitted app as well move a running app to another queue using a PUT request specifying the target queue. To perform the PUT operation, authentication has to be setup for the RM web services. In addition, you must be authorized to move the app. Currently you can only move the app if you're using the Capacity scheduler or the Fair scheduler.
Please note that in order to move an app, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an "UNAUTHORIZED" response.
This feature is currently in the alpha stage and may change in the future.
** URI
-----
* http://<rm http address:port>/ws/v1/cluster/apps/{appid}/queue
-----
** HTTP Operations Supported
------
* GET
* PUT
------
** Query Parameters Supported
------
None
------
** Elements of <appqueue> object
When you make a request for the state of an app, the information returned has the following fields
*---------------+--------------+-------------------------------+
|| Item || Data Type || Description |
*---------------+--------------+-------------------------------+
| queue | string | The application queue |
*---------------+--------------+--------------------------------+
** Response Examples
<<JSON responses>>
HTTP Request
-----
GET http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/queue
-----
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
{
"queue":"default"
}
+---+
HTTP Request
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/queue
----
Request Body:
+---+
{
"queue":"test"
}
+---+
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
{
"queue":"test"
}
+---+
<<XML responses>>
HTTP Request
-----
GET http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/queue
-----
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 98
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appqueue>
<queue>default</queue>
</appqueue>
+---+
HTTP Request
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/queue
----
Request Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appqueue>
<queue>test</queue>
</appqueue>
+---+
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 95
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appqueue>
<queue>test</queue>
</appqueue>
+---+
* Cluster {Delegation Tokens API}
The Delegation Tokens API can be used to create, renew and cancel YARN ResourceManager delegation tokens. All delegation token requests must be carried out on a Kerberos authenticated connection(using SPNEGO). Carrying out operations on a non-kerberos connection will result in a FORBIDDEN response. In case of renewing a token, only the renewer specified when creating the token can renew the token. Other users(including the owner) are forbidden from renewing tokens. It should be noted that when cancelling or renewing a token, the token to be cancelled or renewed is specified by setting a header.