YARN-6897. Refactoring RMWebServices by moving some util methods to RMWebAppUtil. (Giovanni Matteo Fumarola via Subru).
This commit is contained in:
parent
713349a9af
commit
bcde66bed1
|
@ -18,21 +18,45 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.http.lib.StaticUserWebFilter;
|
import org.apache.hadoop.http.lib.StaticUserWebFilter;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
|
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
|
||||||
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
|
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
|
||||||
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
|
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
|
||||||
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Util class for ResourceManager WebApp.
|
* Util class for ResourceManager WebApp.
|
||||||
|
@ -146,4 +170,175 @@ public final class RMWebAppUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the actual ApplicationSubmissionContext to be submitted to the RM
|
||||||
|
* from the information provided by the user.
|
||||||
|
*
|
||||||
|
* @param newApp the information provided by the user
|
||||||
|
* @param conf RM configuration
|
||||||
|
* @return returns the constructed ApplicationSubmissionContext
|
||||||
|
* @throws IOException in case of Error
|
||||||
|
*/
|
||||||
|
public static ApplicationSubmissionContext createAppSubmissionContext(
|
||||||
|
ApplicationSubmissionContextInfo newApp, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// create local resources and app submission context
|
||||||
|
|
||||||
|
ApplicationId appid;
|
||||||
|
String error =
|
||||||
|
"Could not parse application id " + newApp.getApplicationId();
|
||||||
|
try {
|
||||||
|
appid = ApplicationId.fromString(newApp.getApplicationId());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new BadRequestException(error);
|
||||||
|
}
|
||||||
|
ApplicationSubmissionContext appContext = ApplicationSubmissionContext
|
||||||
|
.newInstance(appid, newApp.getApplicationName(), newApp.getQueue(),
|
||||||
|
Priority.newInstance(newApp.getPriority()),
|
||||||
|
createContainerLaunchContext(newApp), newApp.getUnmanagedAM(),
|
||||||
|
newApp.getCancelTokensWhenComplete(), newApp.getMaxAppAttempts(),
|
||||||
|
createAppSubmissionContextResource(newApp, conf),
|
||||||
|
newApp.getApplicationType(),
|
||||||
|
newApp.getKeepContainersAcrossApplicationAttempts(),
|
||||||
|
newApp.getAppNodeLabelExpression(),
|
||||||
|
newApp.getAMContainerNodeLabelExpression());
|
||||||
|
appContext.setApplicationTags(newApp.getApplicationTags());
|
||||||
|
appContext.setAttemptFailuresValidityInterval(
|
||||||
|
newApp.getAttemptFailuresValidityInterval());
|
||||||
|
if (newApp.getLogAggregationContextInfo() != null) {
|
||||||
|
appContext.setLogAggregationContext(
|
||||||
|
createLogAggregationContext(newApp.getLogAggregationContextInfo()));
|
||||||
|
}
|
||||||
|
String reservationIdStr = newApp.getReservationId();
|
||||||
|
if (reservationIdStr != null && !reservationIdStr.isEmpty()) {
|
||||||
|
ReservationId reservationId =
|
||||||
|
ReservationId.parseReservationId(reservationIdStr);
|
||||||
|
appContext.setReservationID(reservationId);
|
||||||
|
}
|
||||||
|
return appContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the actual Resource inside the ApplicationSubmissionContextInfo to
|
||||||
|
* be submitted to the RM from the information provided by the user.
|
||||||
|
*
|
||||||
|
* @param newApp the information provided by the user
|
||||||
|
* @param conf RM configuration
|
||||||
|
* @return returns the constructed Resource inside the
|
||||||
|
* ApplicationSubmissionContextInfo
|
||||||
|
* @throws BadRequestException
|
||||||
|
*/
|
||||||
|
private static Resource createAppSubmissionContextResource(
|
||||||
|
ApplicationSubmissionContextInfo newApp, Configuration conf)
|
||||||
|
throws BadRequestException {
|
||||||
|
if (newApp.getResource().getvCores() > conf.getInt(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)) {
|
||||||
|
String msg = "Requested more cores than configured max";
|
||||||
|
throw new BadRequestException(msg);
|
||||||
|
}
|
||||||
|
if (newApp.getResource().getMemorySize() > conf.getInt(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
|
||||||
|
String msg = "Requested more memory than configured max";
|
||||||
|
throw new BadRequestException(msg);
|
||||||
|
}
|
||||||
|
Resource r = Resource.newInstance(newApp.getResource().getMemorySize(),
|
||||||
|
newApp.getResource().getvCores());
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the ContainerLaunchContext required for the
|
||||||
|
* ApplicationSubmissionContext. This function takes the user information and
|
||||||
|
* generates the ByteBuffer structures required by the ContainerLaunchContext
|
||||||
|
*
|
||||||
|
* @param newApp the information provided by the user
|
||||||
|
* @return created context
|
||||||
|
* @throws BadRequestException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static ContainerLaunchContext createContainerLaunchContext(
|
||||||
|
ApplicationSubmissionContextInfo newApp)
|
||||||
|
throws BadRequestException, IOException {
|
||||||
|
|
||||||
|
// create container launch context
|
||||||
|
|
||||||
|
HashMap<String, ByteBuffer> hmap = new HashMap<String, ByteBuffer>();
|
||||||
|
for (Map.Entry<String, String> entry : newApp
|
||||||
|
.getContainerLaunchContextInfo().getAuxillaryServiceData().entrySet()) {
|
||||||
|
if (!entry.getValue().isEmpty()) {
|
||||||
|
Base64 decoder = new Base64(0, null, true);
|
||||||
|
byte[] data = decoder.decode(entry.getValue());
|
||||||
|
hmap.put(entry.getKey(), ByteBuffer.wrap(data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HashMap<String, LocalResource> hlr = new HashMap<String, LocalResource>();
|
||||||
|
for (Map.Entry<String, LocalResourceInfo> entry : newApp
|
||||||
|
.getContainerLaunchContextInfo().getResources().entrySet()) {
|
||||||
|
LocalResourceInfo l = entry.getValue();
|
||||||
|
LocalResource lr = LocalResource.newInstance(URL.fromURI(l.getUrl()),
|
||||||
|
l.getType(), l.getVisibility(), l.getSize(), l.getTimestamp());
|
||||||
|
hlr.put(entry.getKey(), lr);
|
||||||
|
}
|
||||||
|
|
||||||
|
DataOutputBuffer out = new DataOutputBuffer();
|
||||||
|
Credentials cs = createCredentials(
|
||||||
|
newApp.getContainerLaunchContextInfo().getCredentials());
|
||||||
|
cs.writeTokenStorageToStream(out);
|
||||||
|
ByteBuffer tokens = ByteBuffer.wrap(out.getData());
|
||||||
|
|
||||||
|
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(hlr,
|
||||||
|
newApp.getContainerLaunchContextInfo().getEnvironment(),
|
||||||
|
newApp.getContainerLaunchContextInfo().getCommands(), hmap, tokens,
|
||||||
|
newApp.getContainerLaunchContextInfo().getAcls());
|
||||||
|
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a Credentials object from the information in the CredentialsInfo
|
||||||
|
* object.
|
||||||
|
*
|
||||||
|
* @param credentials the CredentialsInfo provided by the user.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private static Credentials createCredentials(CredentialsInfo credentials) {
|
||||||
|
Credentials ret = new Credentials();
|
||||||
|
try {
|
||||||
|
for (Map.Entry<String, String> entry : credentials.getTokens()
|
||||||
|
.entrySet()) {
|
||||||
|
Text alias = new Text(entry.getKey());
|
||||||
|
Token<TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||||
|
token.decodeFromUrlString(entry.getValue());
|
||||||
|
ret.addToken(alias, token);
|
||||||
|
}
|
||||||
|
for (Map.Entry<String, String> entry : credentials.getSecrets()
|
||||||
|
.entrySet()) {
|
||||||
|
Text alias = new Text(entry.getKey());
|
||||||
|
Base64 decoder = new Base64(0, null, true);
|
||||||
|
byte[] secret = decoder.decode(entry.getValue());
|
||||||
|
ret.addSecretKey(alias, secret);
|
||||||
|
}
|
||||||
|
} catch (IOException ie) {
|
||||||
|
throw new BadRequestException(
|
||||||
|
"Could not parse credentials data; exception message = "
|
||||||
|
+ ie.getMessage());
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static LogAggregationContext createLogAggregationContext(
|
||||||
|
LogAggregationContextInfo logAggregationContextInfo) {
|
||||||
|
return LogAggregationContext.newInstance(
|
||||||
|
logAggregationContextInfo.getIncludePattern(),
|
||||||
|
logAggregationContextInfo.getExcludePattern(),
|
||||||
|
logAggregationContextInfo.getRolledLogsIncludePattern(),
|
||||||
|
logAggregationContextInfo.getRolledLogsExcludePattern(),
|
||||||
|
logAggregationContextInfo.getLogAggregationPolicyClassName(),
|
||||||
|
logAggregationContextInfo.getLogAggregationPolicyParameters());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.security.AccessControlException;
|
import java.security.AccessControlException;
|
||||||
import java.security.Principal;
|
import java.security.Principal;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
@ -57,22 +56,18 @@ import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.http.JettyUtils;
|
import org.apache.hadoop.http.JettyUtils;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
|
@ -104,10 +99,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
@ -119,7 +111,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -136,33 +127,34 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||||
|
@ -185,7 +177,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
|
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||||
|
@ -1589,7 +1580,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
ApplicationSubmissionContext appContext =
|
ApplicationSubmissionContext appContext =
|
||||||
createAppSubmissionContext(newApp);
|
RMWebAppUtil.createAppSubmissionContext(newApp, conf);
|
||||||
|
|
||||||
final SubmitApplicationRequest req =
|
final SubmitApplicationRequest req =
|
||||||
SubmitApplicationRequest.newInstance(appContext);
|
SubmitApplicationRequest.newInstance(appContext);
|
||||||
|
@ -1640,153 +1631,6 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
return appId;
|
return appId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Create the actual ApplicationSubmissionContext to be submitted to the RM
|
|
||||||
* from the information provided by the user.
|
|
||||||
*
|
|
||||||
* @param newApp the information provided by the user
|
|
||||||
* @return returns the constructed ApplicationSubmissionContext
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
protected ApplicationSubmissionContext createAppSubmissionContext(
|
|
||||||
ApplicationSubmissionContextInfo newApp) throws IOException {
|
|
||||||
|
|
||||||
// create local resources and app submission context
|
|
||||||
|
|
||||||
ApplicationId appid;
|
|
||||||
String error =
|
|
||||||
"Could not parse application id " + newApp.getApplicationId();
|
|
||||||
try {
|
|
||||||
appid = ApplicationId.fromString(newApp.getApplicationId());
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new BadRequestException(error);
|
|
||||||
}
|
|
||||||
ApplicationSubmissionContext appContext = ApplicationSubmissionContext
|
|
||||||
.newInstance(appid, newApp.getApplicationName(), newApp.getQueue(),
|
|
||||||
Priority.newInstance(newApp.getPriority()),
|
|
||||||
createContainerLaunchContext(newApp), newApp.getUnmanagedAM(),
|
|
||||||
newApp.getCancelTokensWhenComplete(), newApp.getMaxAppAttempts(),
|
|
||||||
createAppSubmissionContextResource(newApp),
|
|
||||||
newApp.getApplicationType(),
|
|
||||||
newApp.getKeepContainersAcrossApplicationAttempts(),
|
|
||||||
newApp.getAppNodeLabelExpression(),
|
|
||||||
newApp.getAMContainerNodeLabelExpression());
|
|
||||||
appContext.setApplicationTags(newApp.getApplicationTags());
|
|
||||||
appContext.setAttemptFailuresValidityInterval(
|
|
||||||
newApp.getAttemptFailuresValidityInterval());
|
|
||||||
if (newApp.getLogAggregationContextInfo() != null) {
|
|
||||||
appContext.setLogAggregationContext(
|
|
||||||
createLogAggregationContext(newApp.getLogAggregationContextInfo()));
|
|
||||||
}
|
|
||||||
String reservationIdStr = newApp.getReservationId();
|
|
||||||
if (reservationIdStr != null && !reservationIdStr.isEmpty()) {
|
|
||||||
ReservationId reservationId =
|
|
||||||
ReservationId.parseReservationId(reservationIdStr);
|
|
||||||
appContext.setReservationID(reservationId);
|
|
||||||
}
|
|
||||||
return appContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Resource createAppSubmissionContextResource(
|
|
||||||
ApplicationSubmissionContextInfo newApp) throws BadRequestException {
|
|
||||||
if (newApp.getResource().getvCores() > rm.getConfig().getInt(
|
|
||||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)) {
|
|
||||||
String msg = "Requested more cores than configured max";
|
|
||||||
throw new BadRequestException(msg);
|
|
||||||
}
|
|
||||||
if (newApp.getResource().getMemorySize() > rm.getConfig().getInt(
|
|
||||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
|
|
||||||
String msg = "Requested more memory than configured max";
|
|
||||||
throw new BadRequestException(msg);
|
|
||||||
}
|
|
||||||
Resource r = Resource.newInstance(newApp.getResource().getMemorySize(),
|
|
||||||
newApp.getResource().getvCores());
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create the ContainerLaunchContext required for the
|
|
||||||
* ApplicationSubmissionContext. This function takes the user information and
|
|
||||||
* generates the ByteBuffer structures required by the ContainerLaunchContext
|
|
||||||
*
|
|
||||||
* @param newApp the information provided by the user
|
|
||||||
* @return created context
|
|
||||||
* @throws BadRequestException
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
protected ContainerLaunchContext createContainerLaunchContext(
|
|
||||||
ApplicationSubmissionContextInfo newApp)
|
|
||||||
throws BadRequestException, IOException {
|
|
||||||
|
|
||||||
// create container launch context
|
|
||||||
|
|
||||||
HashMap<String, ByteBuffer> hmap = new HashMap<String, ByteBuffer>();
|
|
||||||
for (Map.Entry<String, String> entry : newApp
|
|
||||||
.getContainerLaunchContextInfo().getAuxillaryServiceData().entrySet()) {
|
|
||||||
if (entry.getValue().isEmpty() == false) {
|
|
||||||
Base64 decoder = new Base64(0, null, true);
|
|
||||||
byte[] data = decoder.decode(entry.getValue());
|
|
||||||
hmap.put(entry.getKey(), ByteBuffer.wrap(data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
HashMap<String, LocalResource> hlr = new HashMap<String, LocalResource>();
|
|
||||||
for (Map.Entry<String, LocalResourceInfo> entry : newApp
|
|
||||||
.getContainerLaunchContextInfo().getResources().entrySet()) {
|
|
||||||
LocalResourceInfo l = entry.getValue();
|
|
||||||
LocalResource lr = LocalResource.newInstance(URL.fromURI(l.getUrl()),
|
|
||||||
l.getType(), l.getVisibility(), l.getSize(), l.getTimestamp());
|
|
||||||
hlr.put(entry.getKey(), lr);
|
|
||||||
}
|
|
||||||
|
|
||||||
DataOutputBuffer out = new DataOutputBuffer();
|
|
||||||
Credentials cs = createCredentials(
|
|
||||||
newApp.getContainerLaunchContextInfo().getCredentials());
|
|
||||||
cs.writeTokenStorageToStream(out);
|
|
||||||
ByteBuffer tokens = ByteBuffer.wrap(out.getData());
|
|
||||||
|
|
||||||
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(hlr,
|
|
||||||
newApp.getContainerLaunchContextInfo().getEnvironment(),
|
|
||||||
newApp.getContainerLaunchContextInfo().getCommands(), hmap, tokens,
|
|
||||||
newApp.getContainerLaunchContextInfo().getAcls());
|
|
||||||
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generate a Credentials object from the information in the CredentialsInfo
|
|
||||||
* object.
|
|
||||||
*
|
|
||||||
* @param credentials the CredentialsInfo provided by the user.
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
private Credentials createCredentials(CredentialsInfo credentials) {
|
|
||||||
Credentials ret = new Credentials();
|
|
||||||
try {
|
|
||||||
for (Map.Entry<String, String> entry : credentials.getTokens()
|
|
||||||
.entrySet()) {
|
|
||||||
Text alias = new Text(entry.getKey());
|
|
||||||
Token<TokenIdentifier> token = new Token<TokenIdentifier>();
|
|
||||||
token.decodeFromUrlString(entry.getValue());
|
|
||||||
ret.addToken(alias, token);
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, String> entry : credentials.getSecrets()
|
|
||||||
.entrySet()) {
|
|
||||||
Text alias = new Text(entry.getKey());
|
|
||||||
Base64 decoder = new Base64(0, null, true);
|
|
||||||
byte[] secret = decoder.decode(entry.getValue());
|
|
||||||
ret.addSecretKey(alias, secret);
|
|
||||||
}
|
|
||||||
} catch (IOException ie) {
|
|
||||||
throw new BadRequestException(
|
|
||||||
"Could not parse credentials data; exception message = "
|
|
||||||
+ ie.getMessage());
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
private UserGroupInformation createKerberosUserGroupInformation(
|
private UserGroupInformation createKerberosUserGroupInformation(
|
||||||
HttpServletRequest hsr) throws AuthorizationException, YarnException {
|
HttpServletRequest hsr) throws AuthorizationException, YarnException {
|
||||||
|
|
||||||
|
@ -1815,17 +1659,6 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
return callerUGI;
|
return callerUGI;
|
||||||
}
|
}
|
||||||
|
|
||||||
private LogAggregationContext createLogAggregationContext(
|
|
||||||
LogAggregationContextInfo logAggregationContextInfo) {
|
|
||||||
return LogAggregationContext.newInstance(
|
|
||||||
logAggregationContextInfo.getIncludePattern(),
|
|
||||||
logAggregationContextInfo.getExcludePattern(),
|
|
||||||
logAggregationContextInfo.getRolledLogsIncludePattern(),
|
|
||||||
logAggregationContextInfo.getRolledLogsExcludePattern(),
|
|
||||||
logAggregationContextInfo.getLogAggregationPolicyClassName(),
|
|
||||||
logAggregationContextInfo.getLogAggregationPolicyParameters());
|
|
||||||
}
|
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
@Path(RMWSConsts.DELEGATION_TOKEN)
|
@Path(RMWSConsts.DELEGATION_TOKEN)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
|
Loading…
Reference in New Issue