YARN-1713. Added get-new-app and submit-app functionality to RM web services. Contributed by Varun Vasudev.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1607216 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-07-02 00:23:07 +00:00
parent e6b0f6d346
commit 075ff276ca
12 changed files with 1887 additions and 294 deletions

View File

@ -51,6 +51,9 @@ Release 2.5.0 - UNRELEASED
YARN-2052. Embedded an epoch number in container id to ensure the uniqueness
of container id after RM restarts. (Tsuyoshi OZAWA via jianhe)
YARN-1713. Added get-new-app and submit-app functionality to RM web services.
(Varun Vasudev via vinodkv)
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

View File

@ -88,7 +88,7 @@ public static ApplicationSubmissionContext newInstance(
int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, null, false);
resource, applicationType, false);
}
@Public

View File

@ -21,10 +21,12 @@
import java.io.IOException;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import javax.xml.bind.UnmarshalException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -87,6 +89,9 @@ public Response toResponse(Exception e) {
s = Response.Status.BAD_REQUEST;
} else if (e instanceof BadRequestException) {
s = Response.Status.BAD_REQUEST;
} else if (e instanceof WebApplicationException
&& e.getCause() instanceof UnmarshalException) {
s = Response.Status.BAD_REQUEST;
} else {
LOG.warn("INTERNAL_SERVER_ERROR", e);
s = Response.Status.INTERNAL_SERVER_ERROR;

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.AccessControlException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
@ -36,6 +37,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@ -47,22 +49,38 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -81,17 +99,22 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
@ -758,4 +781,256 @@ private UserGroupInformation getCallerUserGroupInformation(
return callerUGI;
}
/**
* Generates a new ApplicationId which is then sent to the client
*
* @param hsr
* the servlet request
* @return Response containing the app id and the maximum resource
* capabilities
* @throws AuthorizationException
* @throws IOException
* @throws InterruptedException
*/
@POST
@Path("/apps/new-application")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response createNewApplication(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
if (callerUGI == null) {
throw new AuthorizationException("Unable to obtain user name, "
+ "user not authenticated");
}
NewApplication appId = createNewApplication();
return Response.status(Status.OK).entity(appId).build();
}
// reuse the code in ClientRMService to create new app
// get the new app id and submit app
// set location header with new app location
/**
* Function to submit an app to the RM
*
* @param newApp
* structure containing information to construct the
* ApplicationSubmissionContext
* @param hsr
* the servlet request
* @return Response containing the status code
* @throws AuthorizationException
* @throws IOException
* @throws InterruptedException
*/
@POST
@Path("/apps")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
@Context HttpServletRequest hsr) throws AuthorizationException,
IOException, InterruptedException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
if (callerUGI == null) {
throw new AuthorizationException("Unable to obtain user name, "
+ "user not authenticated");
}
ApplicationSubmissionContext appContext =
createAppSubmissionContext(newApp);
final SubmitApplicationRequest req =
SubmitApplicationRequest.newInstance(appContext);
try {
callerUGI
.doAs(new PrivilegedExceptionAction<SubmitApplicationResponse>() {
@Override
public SubmitApplicationResponse run() throws IOException,
YarnException {
return rm.getClientRMService().submitApplication(req);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Submit app request failed", ue);
throw ue;
}
String url = hsr.getRequestURL() + "/" + newApp.getApplicationId();
return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, url)
.build();
}
/**
* Function that actually creates the ApplicationId by calling the
* ClientRMService
*
* @return returns structure containing the app-id and maximum resource
* capabilities
*/
private NewApplication createNewApplication() {
GetNewApplicationRequest req =
recordFactory.newRecordInstance(GetNewApplicationRequest.class);
GetNewApplicationResponse resp;
try {
resp = rm.getClientRMService().getNewApplication(req);
} catch (YarnException e) {
String msg = "Unable to create new app from RM web service";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
NewApplication appId =
new NewApplication(resp.getApplicationId().toString(), new ResourceInfo(
resp.getMaximumResourceCapability()));
return appId;
}
/**
* Create the actual ApplicationSubmissionContext to be submitted to the RM
* from the information provided by the user.
*
* @param newApp
* the information provided by the user
* @return returns the constructed ApplicationSubmissionContext
* @throws IOException
*/
protected ApplicationSubmissionContext createAppSubmissionContext(
ApplicationSubmissionContextInfo newApp) throws IOException {
// create local resources and app submission context
ApplicationId appid;
String error =
"Could not parse application id " + newApp.getApplicationId();
try {
appid =
ConverterUtils.toApplicationId(recordFactory,
newApp.getApplicationId());
} catch (Exception e) {
throw new BadRequestException(error);
}
ApplicationSubmissionContext appContext =
ApplicationSubmissionContext.newInstance(appid,
newApp.getApplicationName(), newApp.getQueue(),
Priority.newInstance(newApp.getPriority()),
createContainerLaunchContext(newApp), newApp.getUnmanagedAM(),
newApp.getCancelTokensWhenComplete(), newApp.getMaxAppAttempts(),
createAppSubmissionContextResource(newApp),
newApp.getApplicationType(),
newApp.getKeepContainersAcrossApplicationAttempts());
appContext.setApplicationTags(newApp.getApplicationTags());
return appContext;
}
protected Resource createAppSubmissionContextResource(
ApplicationSubmissionContextInfo newApp) throws BadRequestException {
if (newApp.getResource().getvCores() > rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)) {
String msg = "Requested more cores than configured max";
throw new BadRequestException(msg);
}
if (newApp.getResource().getMemory() > rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
String msg = "Requested more memory than configured max";
throw new BadRequestException(msg);
}
Resource r =
Resource.newInstance(newApp.getResource().getMemory(), newApp
.getResource().getvCores());
return r;
}
/**
* Create the ContainerLaunchContext required for the
* ApplicationSubmissionContext. This function takes the user information and
* generates the ByteBuffer structures required by the ContainerLaunchContext
*
* @param newApp
* the information provided by the user
* @return
* @throws BadRequestException
* @throws IOException
*/
protected ContainerLaunchContext createContainerLaunchContext(
ApplicationSubmissionContextInfo newApp) throws BadRequestException, IOException {
// create container launch context
HashMap<String, ByteBuffer> hmap = new HashMap<String, ByteBuffer>();
for (Map.Entry<String, String> entry : newApp
.getContainerLaunchContextInfo().getAuxillaryServiceData().entrySet()) {
if (entry.getValue().isEmpty() == false) {
Base64 decoder = new Base64(0, null, true);
byte[] data = decoder.decode(entry.getValue());
hmap.put(entry.getKey(), ByteBuffer.wrap(data));
}
}
HashMap<String, LocalResource> hlr = new HashMap<String, LocalResource>();
for (Map.Entry<String, LocalResourceInfo> entry : newApp
.getContainerLaunchContextInfo().getResources().entrySet()) {
LocalResourceInfo l = entry.getValue();
LocalResource lr =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromURI(l.getUrl()), l.getType(),
l.getVisibility(), l.getSize(), l.getTimestamp());
hlr.put(entry.getKey(), lr);
}
DataOutputBuffer out = new DataOutputBuffer();
Credentials cs =
createCredentials(newApp.getContainerLaunchContextInfo()
.getCredentials());
cs.writeTokenStorageToStream(out);
ByteBuffer tokens = ByteBuffer.wrap(out.getData());
ContainerLaunchContext ctx =
ContainerLaunchContext.newInstance(hlr, newApp
.getContainerLaunchContextInfo().getEnvironment(), newApp
.getContainerLaunchContextInfo().getCommands(), hmap, tokens, newApp
.getContainerLaunchContextInfo().getAcls());
return ctx;
}
/**
* Generate a Credentials object from the information in the CredentialsInfo
* object.
*
* @param credentials
* the CredentialsInfo provided by the user.
* @return
*/
private Credentials createCredentials(CredentialsInfo credentials) {
Credentials ret = new Credentials();
try {
for (Map.Entry<String, String> entry : credentials.getTokens().entrySet()) {
Text alias = new Text(entry.getKey());
Token<TokenIdentifier> token = new Token<TokenIdentifier>();
token.decodeFromUrlString(entry.getValue());
ret.addToken(alias, token);
}
for (Map.Entry<String, String> entry : credentials.getTokens().entrySet()) {
Text alias = new Text(entry.getKey());
Base64 decoder = new Base64(0, null, true);
byte[] secret = decoder.decode(entry.getValue());
ret.addSecretKey(alias, secret);
}
} catch (IOException ie) {
throw new BadRequestException(
"Could not parse credentials data; exception message = "
+ ie.getMessage());
}
return ret;
}
}

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.HashSet;
import java.util.Set;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.Priority;
/**
* Simple class to allow users to send information required to create an
* ApplicationSubmissionContext which can then be used to submit an app
*
*/
@XmlRootElement(name = "application-submission-context")
@XmlAccessorType(XmlAccessType.FIELD)
public class ApplicationSubmissionContextInfo {
@XmlElement(name = "application-id")
String applicationId;
@XmlElement(name = "application-name")
String applicationName;
String queue;
int priority;
@XmlElement(name = "am-container-spec")
ContainerLaunchContextInfo containerInfo;
@XmlElement(name = "unmanaged-AM")
boolean isUnmanagedAM;
@XmlElement(name = "cancel-tokens-when-complete")
boolean cancelTokensWhenComplete;
@XmlElement(name = "max-app-attempts")
int maxAppAttempts;
@XmlElement(name = "resource")
ResourceInfo resource;
@XmlElement(name = "application-type")
String applicationType;
@XmlElement(name = "keep-containers-across-application-attempts")
boolean keepContainers;
@XmlElementWrapper(name = "application-tags")
@XmlElement(name = "tag")
Set<String> tags;
public ApplicationSubmissionContextInfo() {
applicationId = "";
applicationName = "";
containerInfo = new ContainerLaunchContextInfo();
resource = new ResourceInfo();
priority = Priority.UNDEFINED.getPriority();
isUnmanagedAM = false;
cancelTokensWhenComplete = true;
keepContainers = false;
applicationType = "";
tags = new HashSet<String>();
}
public String getApplicationId() {
return applicationId;
}
public String getApplicationName() {
return applicationName;
}
public String getQueue() {
return queue;
}
public int getPriority() {
return priority;
}
public ContainerLaunchContextInfo getContainerLaunchContextInfo() {
return containerInfo;
}
public boolean getUnmanagedAM() {
return isUnmanagedAM;
}
public boolean getCancelTokensWhenComplete() {
return cancelTokensWhenComplete;
}
public int getMaxAppAttempts() {
return maxAppAttempts;
}
public ResourceInfo getResource() {
return resource;
}
public String getApplicationType() {
return applicationType;
}
public boolean getKeepContainersAcrossApplicationAttempts() {
return keepContainers;
}
public Set<String> getApplicationTags() {
return tags;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public void setQueue(String queue) {
this.queue = queue;
}
public void setPriority(int priority) {
this.priority = priority;
}
public void setContainerLaunchContextInfo(
ContainerLaunchContextInfo containerLaunchContext) {
this.containerInfo = containerLaunchContext;
}
public void setUnmanagedAM(boolean isUnmanagedAM) {
this.isUnmanagedAM = isUnmanagedAM;
}
public void setCancelTokensWhenComplete(boolean cancelTokensWhenComplete) {
this.cancelTokensWhenComplete = cancelTokensWhenComplete;
}
public void setMaxAppAttempts(int maxAppAttempts) {
this.maxAppAttempts = maxAppAttempts;
}
public void setResource(ResourceInfo resource) {
this.resource = resource;
}
public void setApplicationType(String applicationType) {
this.applicationType = applicationType;
}
public void
setKeepContainersAcrossApplicationAttempts(boolean keepContainers) {
this.keepContainers = keepContainers;
}
public void setApplicationTags(Set<String> tags) {
this.tags = tags;
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
/**
* Simple class to allow users to send information required to create a
* ContainerLaunchContext which can then be used as part of the
* ApplicationSubmissionContext
*
*/
@XmlRootElement(name = "container-launch-context-info")
@XmlAccessorType(XmlAccessType.FIELD)
public class ContainerLaunchContextInfo {
@XmlElementWrapper(name = "local-resources")
HashMap<String, LocalResourceInfo> local_resources;
HashMap<String, String> environment;
@XmlElementWrapper(name = "commands")
@XmlElement(name = "command", type = String.class)
List<String> commands;
@XmlElementWrapper(name = "service-data")
HashMap<String, String> servicedata;
@XmlElement(name = "credentials")
CredentialsInfo credentials;
@XmlElementWrapper(name = "application-acls")
HashMap<ApplicationAccessType, String> acls;
public ContainerLaunchContextInfo() {
local_resources = new HashMap<String, LocalResourceInfo>();
environment = new HashMap<String, String>();
commands = new ArrayList<String>();
servicedata = new HashMap<String, String>();
credentials = new CredentialsInfo();
acls = new HashMap<ApplicationAccessType, String>();
}
public Map<String, LocalResourceInfo> getResources() {
return local_resources;
}
public Map<String, String> getEnvironment() {
return environment;
}
public List<String> getCommands() {
return commands;
}
public Map<String, String> getAuxillaryServiceData() {
return servicedata;
}
public CredentialsInfo getCredentials() {
return credentials;
}
public Map<ApplicationAccessType, String> getAcls() {
return acls;
}
public void setResources(HashMap<String, LocalResourceInfo> resources) {
this.local_resources = resources;
}
public void setEnvironment(HashMap<String, String> environment) {
this.environment = environment;
}
public void setCommands(List<String> commands) {
this.commands = commands;
}
public void setAuxillaryServiceData(HashMap<String, String> serviceData) {
this.servicedata = serviceData;
}
public void setCredentials(CredentialsInfo credentials) {
this.credentials = credentials;
}
public void setAcls(HashMap<ApplicationAccessType, String> acls) {
this.acls = acls;
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.HashMap;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "credentials-info")
@XmlAccessorType(XmlAccessType.FIELD)
public class CredentialsInfo {
@XmlElementWrapper(name = "tokens")
HashMap<String, String> tokens;
@XmlElementWrapper(name = "secrets")
HashMap<String, String> secrets;
public CredentialsInfo() {
tokens = new HashMap<String, String>();
secrets = new HashMap<String, String>();
}
public HashMap<String, String> getTokens() {
return tokens;
}
public HashMap<String, String> getSecrets() {
return secrets;
}
public void setTokens(HashMap<String, String> tokens) {
this.tokens = tokens;
}
public void setSecrets(HashMap<String, String> secrets) {
this.secrets = secrets;
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.net.URI;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@XmlRootElement(name = "localresources")
@XmlAccessorType(XmlAccessType.FIELD)
public class LocalResourceInfo {
@XmlElement(name = "resource")
URI url;
LocalResourceType type;
LocalResourceVisibility visibility;
long size;
long timestamp;
String pattern;
public URI getUrl() {
return url;
}
public LocalResourceType getType() {
return type;
}
public LocalResourceVisibility getVisibility() {
return visibility;
}
public long getSize() {
return size;
}
public long getTimestamp() {
return timestamp;
}
public String getPattern() {
return pattern;
}
public void setUrl(URI url) {
this.url = url;
}
public void setType(LocalResourceType type) {
this.type = type;
}
public void setVisibility(LocalResourceVisibility visibility) {
this.visibility = visibility;
}
public void setSize(long size) {
if (size <= 0) {
throw new IllegalArgumentException("size must be greater than 0");
}
this.size = size;
}
public void setTimestamp(long timestamp) {
if (timestamp <= 0) {
throw new IllegalArgumentException("timestamp must be greater than 0");
}
this.timestamp = timestamp;
}
public void setPattern(String pattern) {
this.pattern = pattern;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name="NewApplication")
@XmlAccessorType(XmlAccessType.FIELD)
public class NewApplication {
@XmlElement(name="application-id")
String applicationId;
@XmlElement(name="maximum-resource-capability")
ResourceInfo maximumResourceCapability;
public NewApplication() {
applicationId = "";
maximumResourceCapability = new ResourceInfo();
}
public NewApplication(String appId, ResourceInfo maxResources) {
applicationId = appId;
maximumResourceCapability = maxResources;
}
public String getApplicationId() {
return applicationId;
}
public ResourceInfo getMaximumResourceCapability() {
return maximumResourceCapability;
}
}

View File

@ -30,7 +30,7 @@ public class ResourceInfo {
int memory;
int vCores;
public ResourceInfo() {
public ResourceInfo() {
}
public ResourceInfo(Resource res) {
@ -50,4 +50,12 @@ public int getvCores() {
public String toString() {
return "<memory:" + memory + ", vCores:" + vCores + ">";
}
public void setMemory(int memory) {
this.memory = memory;
}
public void setvCores(int vCores) {
this.vCores = vCores;
}
}

View File

@ -25,10 +25,17 @@
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
@ -38,9 +45,15 @@
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -54,7 +67,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONException;
@ -80,6 +97,7 @@
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.filter.LoggingFilter;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONMarshaller;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -461,11 +479,7 @@ public void testSingleAppKillUnauthorized() throws Exception {
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType)
.entity(info, MediaType.APPLICATION_XML).put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
} else {
assertEquals(Status.FORBIDDEN, response.getClientResponseStatus());
}
validateResponseStatus(response, Status.FORBIDDEN);
}
rm.stop();
return;
@ -502,4 +516,348 @@ public void tearDown() throws Exception {
}
super.tearDown();
}
/**
* Helper function to wrap frequently used code. It checks the response status
* and checks if it UNAUTHORIZED if we are running with authorization turned
* off or the param passed if we are running with authorization turned on.
*
* @param response
* the ClientResponse object to be checked
* @param expectedAuthorizedMode
* the expected Status in authorized mode.
*/
public void validateResponseStatus(ClientResponse response,
Status expectedAuthorizedMode) {
validateResponseStatus(response, Status.UNAUTHORIZED,
expectedAuthorizedMode);
}
/**
* Helper function to wrap frequently used code. It checks the response status
* and checks if it is the param expectedUnauthorizedMode if we are running
* with authorization turned off or the param expectedAuthorizedMode passed if
* we are running with authorization turned on.
*
* @param response
* the ClientResponse object to be checked
* @param expectedUnauthorizedMode
* the expected Status in unauthorized mode.
* @param expectedAuthorizedMode
* the expected Status in authorized mode.
*/
public void validateResponseStatus(ClientResponse response,
Status expectedUnauthorizedMode, Status expectedAuthorizedMode) {
if (!isAuthenticationEnabled()) {
assertEquals(expectedUnauthorizedMode, response.getClientResponseStatus());
} else {
assertEquals(expectedAuthorizedMode, response.getClientResponseStatus());
}
}
// Simple test - just post to /apps/id and validate the response
@Test
public void testGetNewApplication() throws Exception {
// client().addFilter(new LoggingFilter(System.out));
rm.start();
String mediaTypes[] =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String acceptMedia : mediaTypes) {
testGetNewApplication(acceptMedia);
}
rm.stop();
return;
}
protected String testGetNewApplication(String mediaType) throws JSONException,
ParserConfigurationException, IOException, SAXException {
ClientResponse response =
this.constructWebResource("apps", "new-application").accept(mediaType)
.post(ClientResponse.class);
validateResponseStatus(response, Status.OK);
if (!isAuthenticationEnabled()) {
return "";
}
return validateGetNewApplicationResponse(response);
}
protected String validateGetNewApplicationResponse(ClientResponse resp)
throws JSONException, ParserConfigurationException, IOException,
SAXException {
String ret = "";
if (resp.getType().equals(MediaType.APPLICATION_JSON_TYPE)) {
JSONObject json = resp.getEntity(JSONObject.class);
ret = validateGetNewApplicationJsonResponse(json);
} else if (resp.getType().equals(MediaType.APPLICATION_XML_TYPE)) {
String xml = resp.getEntity(String.class);
ret = validateGetNewApplicationXMLResponse(xml);
} else {
// we should not be here
assertTrue(false);
}
return ret;
}
protected String validateGetNewApplicationJsonResponse(JSONObject json)
throws JSONException {
String appId = json.getString("application-id");
assertTrue(appId.isEmpty() == false);
JSONObject maxResources = json.getJSONObject("maximum-resource-capability");
long memory = maxResources.getLong("memory");
long vCores = maxResources.getLong("vCores");
assertTrue(memory != 0);
assertTrue(vCores != 0);
return appId;
}
protected String validateGetNewApplicationXMLResponse(String response)
throws ParserConfigurationException, IOException, SAXException {
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(response));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("NewApplication");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String appId = WebServicesTestUtils.getXmlString(element, "application-id");
assertTrue(appId.isEmpty() == false);
NodeList maxResourceNodes =
element.getElementsByTagName("maximum-resource-capability");
assertEquals(1, maxResourceNodes.getLength());
Element maxResourceCapability = (Element) maxResourceNodes.item(0);
long memory =
WebServicesTestUtils.getXmlLong(maxResourceCapability, "memory");
long vCores =
WebServicesTestUtils.getXmlLong(maxResourceCapability, "vCores");
assertTrue(memory != 0);
assertTrue(vCores != 0);
return appId;
}
// Test to validate the process of submitting apps - test for appropriate
// errors as well
@Test
public void testGetNewApplicationAndSubmit() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
String mediaTypes[] =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String acceptMedia : mediaTypes) {
for (String contentMedia : mediaTypes) {
testAppSubmit(acceptMedia, contentMedia);
testAppSubmitErrors(acceptMedia, contentMedia);
}
}
rm.stop();
return;
}
public void testAppSubmit(String acceptMedia, String contentMedia)
throws Exception {
// create a test app and submit it via rest(after getting an app-id) then
// get the app details from the rmcontext and check that everything matches
// client().addFilter(new LoggingFilter(System.out));
String lrKey = "example";
String queueName = "testqueue";
String appName = "test";
String appType = "test-type";
String urlPath = "apps";
String appId = testGetNewApplication(acceptMedia);
List<String> commands = new ArrayList<String>();
commands.add("/bin/sleep 5");
HashMap<String, String> environment = new HashMap<String, String>();
environment.put("APP_VAR", "ENV_SETTING");
HashMap<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.MODIFY_APP, "testuser1, testuser2");
acls.put(ApplicationAccessType.VIEW_APP, "testuser3, testuser4");
Set<String> tags = new HashSet<String>();
tags.add("tag1");
tags.add("tag 2");
CredentialsInfo credentials = new CredentialsInfo();
HashMap<String, String> tokens = new HashMap<String, String>();
HashMap<String, String> secrets = new HashMap<String, String>();
secrets.put("secret1", Base64.encodeBase64URLSafeString("secret1".getBytes("UTF8")));
credentials.setSecrets(secrets);
credentials.setTokens(tokens);
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationId(appId);
appInfo.setApplicationName(appName);
appInfo.setPriority(3);
appInfo.setMaxAppAttempts(2);
appInfo.setQueue(queueName);
appInfo.setApplicationType(appType);
HashMap<String, LocalResourceInfo> lr =
new HashMap<String, LocalResourceInfo>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
y.setTimestamp(System.currentTimeMillis());
y.setType(LocalResourceType.FILE);
y.setVisibility(LocalResourceVisibility.APPLICATION);
lr.put(lrKey, y);
appInfo.getContainerLaunchContextInfo().setResources(lr);
appInfo.getContainerLaunchContextInfo().setCommands(commands);
appInfo.getContainerLaunchContextInfo().setEnvironment(environment);
appInfo.getContainerLaunchContextInfo().setAcls(acls);
appInfo.getContainerLaunchContextInfo().getAuxillaryServiceData()
.put("test", Base64.encodeBase64URLSafeString("value12".getBytes("UTF8")));
appInfo.getContainerLaunchContextInfo().setCredentials(credentials);
appInfo.getResource().setMemory(1024);
appInfo.getResource().setvCores(1);
appInfo.setApplicationTags(tags);
ClientResponse response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
if (this.isAuthenticationEnabled() == false) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
return;
}
assertEquals(Status.ACCEPTED, response.getClientResponseStatus());
assertTrue(response.getHeaders().getFirst(HttpHeaders.LOCATION).isEmpty() == false);
String locURL = response.getHeaders().getFirst(HttpHeaders.LOCATION);
assertTrue(locURL.indexOf("/apps/application") != -1);
appId = locURL.substring(locURL.indexOf("/apps/") + "/apps/".length());
WebResource res = resource().uri(new URI(locURL));
res = res.queryParam("user.name", webserviceUserName);
response = res.get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
RMApp app =
rm.getRMContext().getRMApps()
.get(ConverterUtils.toApplicationId(appId));
assertEquals(appName, app.getName());
assertEquals(webserviceUserName, app.getUser());
assertEquals(2, app.getMaxAppAttempts());
assertEquals(queueName, app.getQueue());
assertEquals(appType, app.getApplicationType());
assertEquals(tags, app.getApplicationTags());
ContainerLaunchContext ctx =
app.getApplicationSubmissionContext().getAMContainerSpec();
assertEquals(commands, ctx.getCommands());
assertEquals(environment, ctx.getEnvironment());
assertEquals(acls, ctx.getApplicationACLs());
Map<String, LocalResource> appLRs = ctx.getLocalResources();
assertTrue(appLRs.containsKey(lrKey));
LocalResource exampleLR = appLRs.get(lrKey);
assertEquals(ConverterUtils.getYarnUrlFromURI(y.getUrl()),
exampleLR.getResource());
assertEquals(y.getSize(), exampleLR.getSize());
assertEquals(y.getTimestamp(), exampleLR.getTimestamp());
assertEquals(y.getType(), exampleLR.getType());
assertEquals(y.getPattern(), exampleLR.getPattern());
assertEquals(y.getVisibility(), exampleLR.getVisibility());
response =
this.constructWebResource("apps", appId).accept(acceptMedia)
.get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
return;
}
public void testAppSubmitErrors(String acceptMedia, String contentMedia)
throws Exception {
// submit a bunch of bad requests(correct format but bad values) via the
// REST API and make sure we get the right error response codes
String urlPath = "apps";
String appId = "";
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
ClientResponse response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
appId = "random";
appInfo.setApplicationId(appId);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
appId = "random_junk";
appInfo.setApplicationId(appId);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
// bad resource info
appInfo.getResource().setMemory(
rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB) + 1);
appInfo.getResource().setvCores(1);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
appInfo.getResource().setvCores(
rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES) + 1);
appInfo.getResource().setMemory(CONTAINER_MB);
response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
return;
}
@Test
public void testAppSubmitBadJsonAndXML() throws Exception {
// submit a bunch of bad XML and JSON via the
// REST API and make sure we get error response codes
String urlPath = "apps";
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationName("test");
appInfo.setPriority(3);
appInfo.setMaxAppAttempts(2);
appInfo.setQueue("testqueue");
appInfo.setApplicationType("test-type");
HashMap<String, LocalResourceInfo> lr =
new HashMap<String, LocalResourceInfo>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));
y.setSize(100);
y.setTimestamp(System.currentTimeMillis());
y.setType(LocalResourceType.FILE);
y.setVisibility(LocalResourceVisibility.APPLICATION);
lr.put("example", y);
appInfo.getContainerLaunchContextInfo().setResources(lr);
appInfo.getResource().setMemory(1024);
appInfo.getResource().setvCores(1);
String body =
"<?xml version=\"1.0\" encoding=\"UTF-8\" "
+ "standalone=\"yes\"?><blah/>";
ClientResponse response =
this.constructWebResource(urlPath).accept(MediaType.APPLICATION_XML)
.entity(body, MediaType.APPLICATION_XML).post(ClientResponse.class);
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
body = "{\"a\" : \"b\"}";
response =
this.constructWebResource(urlPath).accept(MediaType.APPLICATION_XML)
.entity(body, MediaType.APPLICATION_JSON).post(ClientResponse.class);
validateResponseStatus(response, Status.BAD_REQUEST);
rm.stop();
}
}