YARN-6897. Refactoring RMWebServices by moving some util methods to RMWebAppUtil. (Giovanni Matteo Fumarola via Subru).

This commit is contained in:
Subru Krishnan 2017-08-08 14:01:22 -07:00
parent c5b3d80a4b
commit e9ec1e8f87
2 changed files with 196 additions and 171 deletions

View File

@ -18,21 +18,45 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.webapp.BadRequestException;
/**
* Util class for ResourceManager WebApp.
@ -146,4 +170,175 @@ public final class RMWebAppUtil {
}
}
}
/**
* Create the actual ApplicationSubmissionContext to be submitted to the RM
* from the information provided by the user.
*
* @param newApp the information provided by the user
* @param conf RM configuration
* @return returns the constructed ApplicationSubmissionContext
* @throws IOException in case of Error
*/
public static ApplicationSubmissionContext createAppSubmissionContext(
ApplicationSubmissionContextInfo newApp, Configuration conf)
throws IOException {
// create local resources and app submission context
ApplicationId appid;
String error =
"Could not parse application id " + newApp.getApplicationId();
try {
appid = ApplicationId.fromString(newApp.getApplicationId());
} catch (Exception e) {
throw new BadRequestException(error);
}
ApplicationSubmissionContext appContext = ApplicationSubmissionContext
.newInstance(appid, newApp.getApplicationName(), newApp.getQueue(),
Priority.newInstance(newApp.getPriority()),
createContainerLaunchContext(newApp), newApp.getUnmanagedAM(),
newApp.getCancelTokensWhenComplete(), newApp.getMaxAppAttempts(),
createAppSubmissionContextResource(newApp, conf),
newApp.getApplicationType(),
newApp.getKeepContainersAcrossApplicationAttempts(),
newApp.getAppNodeLabelExpression(),
newApp.getAMContainerNodeLabelExpression());
appContext.setApplicationTags(newApp.getApplicationTags());
appContext.setAttemptFailuresValidityInterval(
newApp.getAttemptFailuresValidityInterval());
if (newApp.getLogAggregationContextInfo() != null) {
appContext.setLogAggregationContext(
createLogAggregationContext(newApp.getLogAggregationContextInfo()));
}
String reservationIdStr = newApp.getReservationId();
if (reservationIdStr != null && !reservationIdStr.isEmpty()) {
ReservationId reservationId =
ReservationId.parseReservationId(reservationIdStr);
appContext.setReservationID(reservationId);
}
return appContext;
}
/**
* Create the actual Resource inside the ApplicationSubmissionContextInfo to
* be submitted to the RM from the information provided by the user.
*
* @param newApp the information provided by the user
* @param conf RM configuration
* @return returns the constructed Resource inside the
* ApplicationSubmissionContextInfo
* @throws BadRequestException
*/
private static Resource createAppSubmissionContextResource(
ApplicationSubmissionContextInfo newApp, Configuration conf)
throws BadRequestException {
if (newApp.getResource().getvCores() > conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)) {
String msg = "Requested more cores than configured max";
throw new BadRequestException(msg);
}
if (newApp.getResource().getMemorySize() > conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
String msg = "Requested more memory than configured max";
throw new BadRequestException(msg);
}
Resource r = Resource.newInstance(newApp.getResource().getMemorySize(),
newApp.getResource().getvCores());
return r;
}
/**
* Create the ContainerLaunchContext required for the
* ApplicationSubmissionContext. This function takes the user information and
* generates the ByteBuffer structures required by the ContainerLaunchContext
*
* @param newApp the information provided by the user
* @return created context
* @throws BadRequestException
* @throws IOException
*/
private static ContainerLaunchContext createContainerLaunchContext(
ApplicationSubmissionContextInfo newApp)
throws BadRequestException, IOException {
// create container launch context
HashMap<String, ByteBuffer> hmap = new HashMap<String, ByteBuffer>();
for (Map.Entry<String, String> entry : newApp
.getContainerLaunchContextInfo().getAuxillaryServiceData().entrySet()) {
if (!entry.getValue().isEmpty()) {
Base64 decoder = new Base64(0, null, true);
byte[] data = decoder.decode(entry.getValue());
hmap.put(entry.getKey(), ByteBuffer.wrap(data));
}
}
HashMap<String, LocalResource> hlr = new HashMap<String, LocalResource>();
for (Map.Entry<String, LocalResourceInfo> entry : newApp
.getContainerLaunchContextInfo().getResources().entrySet()) {
LocalResourceInfo l = entry.getValue();
LocalResource lr = LocalResource.newInstance(URL.fromURI(l.getUrl()),
l.getType(), l.getVisibility(), l.getSize(), l.getTimestamp());
hlr.put(entry.getKey(), lr);
}
DataOutputBuffer out = new DataOutputBuffer();
Credentials cs = createCredentials(
newApp.getContainerLaunchContextInfo().getCredentials());
cs.writeTokenStorageToStream(out);
ByteBuffer tokens = ByteBuffer.wrap(out.getData());
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(hlr,
newApp.getContainerLaunchContextInfo().getEnvironment(),
newApp.getContainerLaunchContextInfo().getCommands(), hmap, tokens,
newApp.getContainerLaunchContextInfo().getAcls());
return ctx;
}
/**
* Generate a Credentials object from the information in the CredentialsInfo
* object.
*
* @param credentials the CredentialsInfo provided by the user.
* @return created credentials
*/
private static Credentials createCredentials(CredentialsInfo credentials) {
Credentials ret = new Credentials();
try {
for (Map.Entry<String, String> entry : credentials.getTokens()
.entrySet()) {
Text alias = new Text(entry.getKey());
Token<TokenIdentifier> token = new Token<TokenIdentifier>();
token.decodeFromUrlString(entry.getValue());
ret.addToken(alias, token);
}
for (Map.Entry<String, String> entry : credentials.getSecrets()
.entrySet()) {
Text alias = new Text(entry.getKey());
Base64 decoder = new Base64(0, null, true);
byte[] secret = decoder.decode(entry.getValue());
ret.addSecretKey(alias, secret);
}
} catch (IOException ie) {
throw new BadRequestException(
"Could not parse credentials data; exception message = "
+ ie.getMessage());
}
return ret;
}
private static LogAggregationContext createLogAggregationContext(
LogAggregationContextInfo logAggregationContextInfo) {
return LogAggregationContext.newInstance(
logAggregationContextInfo.getIncludePattern(),
logAggregationContextInfo.getExcludePattern(),
logAggregationContextInfo.getRolledLogsIncludePattern(),
logAggregationContextInfo.getRolledLogsExcludePattern(),
logAggregationContextInfo.getLogAggregationPolicyClassName(),
logAggregationContextInfo.getLogAggregationPolicyParameters());
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
@ -57,21 +56,17 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
@ -103,10 +98,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -118,7 +110,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -159,13 +150,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
@ -1559,7 +1547,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
}
ApplicationSubmissionContext appContext =
createAppSubmissionContext(newApp);
RMWebAppUtil.createAppSubmissionContext(newApp, conf);
final SubmitApplicationRequest req =
SubmitApplicationRequest.newInstance(appContext);
@ -1610,153 +1598,6 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return appId;
}
/**
* Create the actual ApplicationSubmissionContext to be submitted to the RM
* from the information provided by the user.
*
* @param newApp the information provided by the user
* @return returns the constructed ApplicationSubmissionContext
* @throws IOException
*/
protected ApplicationSubmissionContext createAppSubmissionContext(
ApplicationSubmissionContextInfo newApp) throws IOException {
// create local resources and app submission context
ApplicationId appid;
String error =
"Could not parse application id " + newApp.getApplicationId();
try {
appid = ApplicationId.fromString(newApp.getApplicationId());
} catch (Exception e) {
throw new BadRequestException(error);
}
ApplicationSubmissionContext appContext = ApplicationSubmissionContext
.newInstance(appid, newApp.getApplicationName(), newApp.getQueue(),
Priority.newInstance(newApp.getPriority()),
createContainerLaunchContext(newApp), newApp.getUnmanagedAM(),
newApp.getCancelTokensWhenComplete(), newApp.getMaxAppAttempts(),
createAppSubmissionContextResource(newApp),
newApp.getApplicationType(),
newApp.getKeepContainersAcrossApplicationAttempts(),
newApp.getAppNodeLabelExpression(),
newApp.getAMContainerNodeLabelExpression());
appContext.setApplicationTags(newApp.getApplicationTags());
appContext.setAttemptFailuresValidityInterval(
newApp.getAttemptFailuresValidityInterval());
if (newApp.getLogAggregationContextInfo() != null) {
appContext.setLogAggregationContext(
createLogAggregationContext(newApp.getLogAggregationContextInfo()));
}
String reservationIdStr = newApp.getReservationId();
if (reservationIdStr != null && !reservationIdStr.isEmpty()) {
ReservationId reservationId =
ReservationId.parseReservationId(reservationIdStr);
appContext.setReservationID(reservationId);
}
return appContext;
}
protected Resource createAppSubmissionContextResource(
ApplicationSubmissionContextInfo newApp) throws BadRequestException {
if (newApp.getResource().getvCores() > rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)) {
String msg = "Requested more cores than configured max";
throw new BadRequestException(msg);
}
if (newApp.getResource().getMemorySize() > rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
String msg = "Requested more memory than configured max";
throw new BadRequestException(msg);
}
Resource r = Resource.newInstance(newApp.getResource().getMemorySize(),
newApp.getResource().getvCores());
return r;
}
/**
* Create the ContainerLaunchContext required for the
* ApplicationSubmissionContext. This function takes the user information and
* generates the ByteBuffer structures required by the ContainerLaunchContext
*
* @param newApp the information provided by the user
* @return created context
* @throws BadRequestException
* @throws IOException
*/
protected ContainerLaunchContext createContainerLaunchContext(
ApplicationSubmissionContextInfo newApp)
throws BadRequestException, IOException {
// create container launch context
HashMap<String, ByteBuffer> hmap = new HashMap<String, ByteBuffer>();
for (Map.Entry<String, String> entry : newApp
.getContainerLaunchContextInfo().getAuxillaryServiceData().entrySet()) {
if (entry.getValue().isEmpty() == false) {
Base64 decoder = new Base64(0, null, true);
byte[] data = decoder.decode(entry.getValue());
hmap.put(entry.getKey(), ByteBuffer.wrap(data));
}
}
HashMap<String, LocalResource> hlr = new HashMap<String, LocalResource>();
for (Map.Entry<String, LocalResourceInfo> entry : newApp
.getContainerLaunchContextInfo().getResources().entrySet()) {
LocalResourceInfo l = entry.getValue();
LocalResource lr = LocalResource.newInstance(URL.fromURI(l.getUrl()),
l.getType(), l.getVisibility(), l.getSize(), l.getTimestamp());
hlr.put(entry.getKey(), lr);
}
DataOutputBuffer out = new DataOutputBuffer();
Credentials cs = createCredentials(
newApp.getContainerLaunchContextInfo().getCredentials());
cs.writeTokenStorageToStream(out);
ByteBuffer tokens = ByteBuffer.wrap(out.getData());
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(hlr,
newApp.getContainerLaunchContextInfo().getEnvironment(),
newApp.getContainerLaunchContextInfo().getCommands(), hmap, tokens,
newApp.getContainerLaunchContextInfo().getAcls());
return ctx;
}
/**
* Generate a Credentials object from the information in the CredentialsInfo
* object.
*
* @param credentials the CredentialsInfo provided by the user.
* @return
*/
private Credentials createCredentials(CredentialsInfo credentials) {
Credentials ret = new Credentials();
try {
for (Map.Entry<String, String> entry : credentials.getTokens()
.entrySet()) {
Text alias = new Text(entry.getKey());
Token<TokenIdentifier> token = new Token<TokenIdentifier>();
token.decodeFromUrlString(entry.getValue());
ret.addToken(alias, token);
}
for (Map.Entry<String, String> entry : credentials.getSecrets()
.entrySet()) {
Text alias = new Text(entry.getKey());
Base64 decoder = new Base64(0, null, true);
byte[] secret = decoder.decode(entry.getValue());
ret.addSecretKey(alias, secret);
}
} catch (IOException ie) {
throw new BadRequestException(
"Could not parse credentials data; exception message = "
+ ie.getMessage());
}
return ret;
}
private UserGroupInformation createKerberosUserGroupInformation(
HttpServletRequest hsr) throws AuthorizationException, YarnException {
@ -1785,17 +1626,6 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return callerUGI;
}
private LogAggregationContext createLogAggregationContext(
LogAggregationContextInfo logAggregationContextInfo) {
return LogAggregationContext.newInstance(
logAggregationContextInfo.getIncludePattern(),
logAggregationContextInfo.getExcludePattern(),
logAggregationContextInfo.getRolledLogsIncludePattern(),
logAggregationContextInfo.getRolledLogsExcludePattern(),
logAggregationContextInfo.getLogAggregationPolicyClassName(),
logAggregationContextInfo.getLogAggregationPolicyParameters());
}
@POST
@Path(RMWSConsts.DELEGATION_TOKEN)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })