YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil
(cherry picked from commit 6f72f1e600
)
This commit is contained in:
parent
a0b7ef15d0
commit
f23be93dd1
|
@ -132,6 +132,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3970. Add REST api support for Application Priority.
|
||||
(Naganarasimha G R via vvasudev)
|
||||
|
||||
YARN-2884. Added a proxy service in NM to proxy the the communication
|
||||
between AM and RM. (Kishore Chaliparambil via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -1332,6 +1332,23 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
|
||||
+ "application.classpath";
|
||||
|
||||
public static final String AMRM_PROXY_ENABLED = NM_PREFIX
|
||||
+ "amrmproxy.enable";
|
||||
public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
|
||||
public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
|
||||
+ "amrmproxy.address";
|
||||
public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
|
||||
public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
|
||||
+ DEFAULT_AMRM_PROXY_PORT;
|
||||
public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
|
||||
+ "amrmproxy.client.thread-count";
|
||||
public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
|
||||
public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
|
||||
NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
|
||||
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
|
||||
"org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
|
||||
+ "DefaultRequestInterceptor";
|
||||
|
||||
/**
|
||||
* Default platform-agnostic CLASSPATH for YARN applications. A
|
||||
* comma-separated list of CLASSPATH entries. The parameter expansion marker
|
||||
|
|
|
@ -86,6 +86,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
|||
.add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
|
||||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
|
||||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
|
||||
|
||||
// Ignore all YARN Application Timeline Service (version 1) properties
|
||||
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
|
||||
|
|
|
@ -2259,4 +2259,38 @@
|
|||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
|
||||
calls from the application masters to the resource manager.
|
||||
</description>
|
||||
<name>yarn.nodemanager.amrmproxy.enable</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The address of the AMRMProxyService listener.
|
||||
</description>
|
||||
<name>yarn.nodemanager.amrmproxy.address</name>
|
||||
<value>0.0.0.0:8048</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The number of threads used to handle requests by the AMRMProxyService.
|
||||
</description>
|
||||
<name>yarn.nodemanager.amrmproxy.client.thread-count</name>
|
||||
<value>25</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
|
||||
AMRMProxyService to create the request processing pipeline for applications.
|
||||
</description>
|
||||
<name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
|
||||
<value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.utils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Utility class that contains commonly used server methods.
|
||||
*
|
||||
*/
|
||||
@Private
|
||||
public final class YarnServerSecurityUtils {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(YarnServerSecurityUtils.class);
|
||||
|
||||
private YarnServerSecurityUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Authorizes the current request and returns the AMRMTokenIdentifier for the
|
||||
* current application.
|
||||
*
|
||||
* @return the AMRMTokenIdentifier instance for the current user
|
||||
* @throws YarnException
|
||||
*/
|
||||
public static AMRMTokenIdentifier authorizeRequest()
|
||||
throws YarnException {
|
||||
|
||||
UserGroupInformation remoteUgi;
|
||||
try {
|
||||
remoteUgi = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
String msg =
|
||||
"Cannot obtain the user-name for authorizing ApplicationMaster. "
|
||||
+ "Got exception: " + StringUtils.stringifyException(e);
|
||||
LOG.warn(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
|
||||
boolean tokenFound = false;
|
||||
String message = "";
|
||||
AMRMTokenIdentifier appTokenIdentifier = null;
|
||||
try {
|
||||
appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
|
||||
if (appTokenIdentifier == null) {
|
||||
tokenFound = false;
|
||||
message = "No AMRMToken found for user " + remoteUgi.getUserName();
|
||||
} else {
|
||||
tokenFound = true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
tokenFound = false;
|
||||
message =
|
||||
"Got exception while looking for AMRMToken for user "
|
||||
+ remoteUgi.getUserName();
|
||||
}
|
||||
|
||||
if (!tokenFound) {
|
||||
LOG.warn(message);
|
||||
throw RPCUtil.getRemoteException(message);
|
||||
}
|
||||
|
||||
return appTokenIdentifier;
|
||||
}
|
||||
|
||||
// Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
|
||||
// currently sets only the required id, but iterate through anyways just to be
|
||||
// sure.
|
||||
private static AMRMTokenIdentifier selectAMRMTokenIdentifier(
|
||||
UserGroupInformation remoteUgi) throws IOException {
|
||||
AMRMTokenIdentifier result = null;
|
||||
Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
|
||||
for (TokenIdentifier tokenId : tokenIds) {
|
||||
if (tokenId instanceof AMRMTokenIdentifier) {
|
||||
result = (AMRMTokenIdentifier) tokenId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the container launch context and returns a Credential instance that
|
||||
* contains all the tokens from the launch context.
|
||||
* @param launchContext
|
||||
* @return the credential instance
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Credentials parseCredentials(
|
||||
ContainerLaunchContext launchContext) throws IOException {
|
||||
Credentials credentials = new Credentials();
|
||||
ByteBuffer tokens = launchContext.getTokens();
|
||||
|
||||
if (tokens != null) {
|
||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||
tokens.rewind();
|
||||
buf.reset(tokens);
|
||||
credentials.readTokenStorageStream(buf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Token<? extends TokenIdentifier> tk : credentials
|
||||
.getAllTokens()) {
|
||||
LOG.debug(tk.getService() + " = " + tk.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return credentials;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
|
||||
/**
|
||||
* Interface that can be used by the intercepter plugins to get the information
|
||||
* about one application.
|
||||
*
|
||||
*/
|
||||
public interface AMRMProxyApplicationContext {
|
||||
|
||||
/**
|
||||
* Gets the configuration object instance.
|
||||
* @return the configuration object.
|
||||
*/
|
||||
Configuration getConf();
|
||||
|
||||
/**
|
||||
* Gets the application attempt identifier.
|
||||
* @return the application attempt identifier.
|
||||
*/
|
||||
ApplicationAttemptId getApplicationAttemptId();
|
||||
|
||||
/**
|
||||
* Gets the application submitter.
|
||||
* @return the application submitter
|
||||
*/
|
||||
String getUser();
|
||||
|
||||
/**
|
||||
* Gets the application's AMRMToken that is issued by the RM.
|
||||
* @return the application's AMRMToken that is issued by the RM.
|
||||
*/
|
||||
Token<AMRMTokenIdentifier> getAMRMToken();
|
||||
|
||||
/**
|
||||
* Gets the application's local AMRMToken issued by the proxy service.
|
||||
* @return the application's local AMRMToken issued by the proxy service.
|
||||
*/
|
||||
Token<AMRMTokenIdentifier> getLocalAMRMToken();
|
||||
|
||||
/**
|
||||
* Gets the NMContext object.
|
||||
* @return the NMContext.
|
||||
*/
|
||||
Context getNMCotext();
|
||||
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
|
||||
/**
|
||||
* Encapsulates the information about one application that is needed by the
|
||||
* request intercepters.
|
||||
*
|
||||
*/
|
||||
public class AMRMProxyApplicationContextImpl implements
|
||||
AMRMProxyApplicationContext {
|
||||
private final Configuration conf;
|
||||
private final Context nmContext;
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final String user;
|
||||
private Integer localTokenKeyId;
|
||||
private Token<AMRMTokenIdentifier> amrmToken;
|
||||
private Token<AMRMTokenIdentifier> localToken;
|
||||
|
||||
/**
|
||||
* Create an instance of the AMRMProxyApplicationContext.
|
||||
*
|
||||
* @param nmContext
|
||||
* @param conf
|
||||
* @param applicationAttemptId
|
||||
* @param user
|
||||
* @param amrmToken
|
||||
*/
|
||||
public AMRMProxyApplicationContextImpl(Context nmContext,
|
||||
Configuration conf, ApplicationAttemptId applicationAttemptId,
|
||||
String user, Token<AMRMTokenIdentifier> amrmToken,
|
||||
Token<AMRMTokenIdentifier> localToken) {
|
||||
this.nmContext = nmContext;
|
||||
this.conf = conf;
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
this.user = user;
|
||||
this.amrmToken = amrmToken;
|
||||
this.localToken = localToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationAttemptId getApplicationAttemptId() {
|
||||
return applicationAttemptId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Token<AMRMTokenIdentifier> getAMRMToken() {
|
||||
return amrmToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the application's AMRMToken.
|
||||
*/
|
||||
public synchronized void setAMRMToken(
|
||||
Token<AMRMTokenIdentifier> amrmToken) {
|
||||
this.amrmToken = amrmToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Token<AMRMTokenIdentifier> getLocalAMRMToken() {
|
||||
return this.localToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the application's AMRMToken.
|
||||
*/
|
||||
public synchronized void setLocalAMRMToken(
|
||||
Token<AMRMTokenIdentifier> localToken) {
|
||||
this.localToken = localToken;
|
||||
this.localTokenKeyId = null;
|
||||
}
|
||||
|
||||
@Private
|
||||
public synchronized int getLocalAMRMTokenKeyId() {
|
||||
Integer keyId = this.localTokenKeyId;
|
||||
if (keyId == null) {
|
||||
try {
|
||||
if (this.localToken == null) {
|
||||
throw new YarnRuntimeException("Missing AMRM token for "
|
||||
+ this.applicationAttemptId);
|
||||
}
|
||||
keyId = this.amrmToken.decodeIdentifier().getKeyId();
|
||||
this.localTokenKeyId = keyId;
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException("AMRM token decode error for "
|
||||
+ this.applicationAttemptId, e);
|
||||
}
|
||||
}
|
||||
return keyId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Context getNMCotext() {
|
||||
return nmContext;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,592 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SaslRpcServer;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* AMRMProxyService is a service that runs on each node manager that can be used
|
||||
* to intercept and inspect messages from application master to the cluster
|
||||
* resource manager. It listens to messages from the application master and
|
||||
* creates a request intercepting pipeline instance for each application. The
|
||||
* pipeline is a chain of intercepter instances that can inspect and modify the
|
||||
* request/response as needed.
|
||||
*/
|
||||
public class AMRMProxyService extends AbstractService implements
|
||||
ApplicationMasterProtocol {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(AMRMProxyService.class);
|
||||
private Server server;
|
||||
private final Context nmContext;
|
||||
private final AsyncDispatcher dispatcher;
|
||||
private InetSocketAddress listenerEndpoint;
|
||||
private AMRMProxyTokenSecretManager secretManager;
|
||||
private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
|
||||
|
||||
/**
|
||||
* Creates an instance of the service.
|
||||
*
|
||||
* @param nmContext
|
||||
* @param dispatcher
|
||||
*/
|
||||
public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
|
||||
super(AMRMProxyService.class.getName());
|
||||
Preconditions.checkArgument(nmContext != null, "nmContext is null");
|
||||
Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
|
||||
this.nmContext = nmContext;
|
||||
this.dispatcher = dispatcher;
|
||||
this.applPipelineMap =
|
||||
new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
|
||||
|
||||
this.dispatcher.register(ApplicationEventType.class,
|
||||
new ApplicationEventHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
LOG.info("Starting AMRMProxyService");
|
||||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
this.listenerEndpoint =
|
||||
conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_PORT);
|
||||
|
||||
Configuration serverConf = new Configuration(conf);
|
||||
serverConf.set(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
SaslRpcServer.AuthMethod.TOKEN.toString());
|
||||
|
||||
int numWorkerThreads =
|
||||
serverConf.getInt(
|
||||
YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
|
||||
|
||||
this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
|
||||
this.secretManager.start();
|
||||
|
||||
this.server =
|
||||
rpc.getServer(ApplicationMasterProtocol.class, this,
|
||||
listenerEndpoint, serverConf, this.secretManager,
|
||||
numWorkerThreads);
|
||||
|
||||
this.server.start();
|
||||
LOG.info("AMRMProxyService listening on address: "
|
||||
+ this.server.getListenerAddress());
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
LOG.info("Stopping AMRMProxyService");
|
||||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
|
||||
this.secretManager.stop();
|
||||
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called by the AMs started on this node to register with the RM.
|
||||
* This method does the initial authorization and then forwards the request to
|
||||
* the application instance specific intercepter chain.
|
||||
*/
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
LOG.info("Registering application master." + " Host:"
|
||||
+ request.getHost() + " Port:" + request.getRpcPort()
|
||||
+ " Tracking Url:" + request.getTrackingUrl());
|
||||
RequestInterceptorChainWrapper pipeline =
|
||||
authorizeAndGetInterceptorChain();
|
||||
return pipeline.getRootInterceptor()
|
||||
.registerApplicationMaster(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called by the AMs started on this node to unregister from the RM.
|
||||
* This method does the initial authorization and then forwards the request to
|
||||
* the application instance specific intercepter chain.
|
||||
*/
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
LOG.info("Finishing application master. Tracking Url:"
|
||||
+ request.getTrackingUrl());
|
||||
RequestInterceptorChainWrapper pipeline =
|
||||
authorizeAndGetInterceptorChain();
|
||||
return pipeline.getRootInterceptor().finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called by the AMs started on this node to send heart beat to RM.
|
||||
* This method does the initial authorization and then forwards the request to
|
||||
* the application instance specific pipeline, which is a chain of request
|
||||
* intercepter objects. One application request processing pipeline is created
|
||||
* per AM instance.
|
||||
*/
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
AMRMTokenIdentifier amrmTokenIdentifier =
|
||||
YarnServerSecurityUtils.authorizeRequest();
|
||||
RequestInterceptorChainWrapper pipeline =
|
||||
getInterceptorChain(amrmTokenIdentifier);
|
||||
AllocateResponse allocateResponse =
|
||||
pipeline.getRootInterceptor().allocate(request);
|
||||
|
||||
updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
|
||||
|
||||
return allocateResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback from the ContainerManager implementation for initializing the
|
||||
* application request processing pipeline.
|
||||
*
|
||||
* @param request - encapsulates information for starting an AM
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
public void processApplicationStartRequest(StartContainerRequest request)
|
||||
throws IOException, YarnException {
|
||||
LOG.info("Callback received for initializing request "
|
||||
+ "processing pipeline for an AM");
|
||||
ContainerTokenIdentifier containerTokenIdentifierForKey =
|
||||
BuilderUtils.newContainerTokenIdentifier(request
|
||||
.getContainerToken());
|
||||
ApplicationAttemptId appAttemptId =
|
||||
containerTokenIdentifierForKey.getContainerID()
|
||||
.getApplicationAttemptId();
|
||||
Credentials credentials =
|
||||
YarnServerSecurityUtils.parseCredentials(request
|
||||
.getContainerLaunchContext());
|
||||
|
||||
Token<AMRMTokenIdentifier> amrmToken =
|
||||
getFirstAMRMToken(credentials.getAllTokens());
|
||||
if (amrmToken == null) {
|
||||
throw new YarnRuntimeException(
|
||||
"AMRMToken not found in the start container request for application:"
|
||||
+ appAttemptId.toString());
|
||||
}
|
||||
|
||||
// Substitute the existing AMRM Token with a local one. Keep the rest of the
|
||||
// tokens in the credentials intact.
|
||||
Token<AMRMTokenIdentifier> localToken =
|
||||
this.secretManager.createAndGetAMRMToken(appAttemptId);
|
||||
credentials.addToken(localToken.getService(), localToken);
|
||||
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
credentials.writeTokenStorageToStream(dob);
|
||||
request.getContainerLaunchContext().setTokens(
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
|
||||
|
||||
initializePipeline(containerTokenIdentifierForKey.getContainerID()
|
||||
.getApplicationAttemptId(),
|
||||
containerTokenIdentifierForKey.getApplicationSubmitter(),
|
||||
amrmToken, localToken);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the request intercepter pipeline for the specified application.
|
||||
*
|
||||
* @param applicationAttemptId
|
||||
* @param user
|
||||
* @param amrmToken
|
||||
*/
|
||||
protected void initializePipeline(
|
||||
ApplicationAttemptId applicationAttemptId, String user,
|
||||
Token<AMRMTokenIdentifier> amrmToken,
|
||||
Token<AMRMTokenIdentifier> localToken) {
|
||||
RequestInterceptorChainWrapper chainWrapper = null;
|
||||
synchronized (applPipelineMap) {
|
||||
if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) {
|
||||
LOG.warn("Request to start an already existing appId was received. "
|
||||
+ " This can happen if an application failed and a new attempt "
|
||||
+ "was created on this machine. ApplicationId: "
|
||||
+ applicationAttemptId.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
chainWrapper = new RequestInterceptorChainWrapper();
|
||||
this.applPipelineMap.put(applicationAttemptId.getApplicationId(),
|
||||
chainWrapper);
|
||||
}
|
||||
|
||||
// We register the pipeline instance in the map first and then initialize it
|
||||
// later because chain initialization can be expensive and we would like to
|
||||
// release the lock as soon as possible to prevent other applications from
|
||||
// blocking when one application's chain is initializing
|
||||
LOG.info("Initializing request processing pipeline for application. "
|
||||
+ " ApplicationId:" + applicationAttemptId + " for the user: "
|
||||
+ user);
|
||||
|
||||
RequestInterceptor interceptorChain =
|
||||
this.createRequestInterceptorChain();
|
||||
interceptorChain.init(createApplicationMasterContext(
|
||||
applicationAttemptId, user, amrmToken, localToken));
|
||||
chainWrapper.init(interceptorChain, applicationAttemptId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the request processing pipeline for the specified application
|
||||
* attempt id.
|
||||
*
|
||||
* @param applicationId
|
||||
*/
|
||||
protected void stopApplication(ApplicationId applicationId) {
|
||||
Preconditions.checkArgument(applicationId != null,
|
||||
"applicationId is null");
|
||||
RequestInterceptorChainWrapper pipeline =
|
||||
this.applPipelineMap.remove(applicationId);
|
||||
|
||||
if (pipeline == null) {
|
||||
LOG.info("Request to stop an application that does not exist. Id:"
|
||||
+ applicationId);
|
||||
} else {
|
||||
LOG.info("Stopping the request processing pipeline for application: "
|
||||
+ applicationId);
|
||||
try {
|
||||
pipeline.getRootInterceptor().shutdown();
|
||||
} catch (Throwable ex) {
|
||||
LOG.warn(
|
||||
"Failed to shutdown the request processing pipeline for app:"
|
||||
+ applicationId, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
|
||||
RequestInterceptorChainWrapper pipeline,
|
||||
AllocateResponse allocateResponse) {
|
||||
AMRMProxyApplicationContextImpl context =
|
||||
(AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
|
||||
.getApplicationContext();
|
||||
|
||||
// check to see if the RM has issued a new AMRMToken & accordingly update
|
||||
// the real ARMRMToken in the current context
|
||||
if (allocateResponse.getAMRMToken() != null) {
|
||||
org.apache.hadoop.yarn.api.records.Token token =
|
||||
allocateResponse.getAMRMToken();
|
||||
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
|
||||
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
|
||||
token.getIdentifier().array(), token.getPassword().array(),
|
||||
new Text(token.getKind()), new Text(token.getService()));
|
||||
|
||||
context.setAMRMToken(newTokenId);
|
||||
}
|
||||
|
||||
// Check if the local AMRMToken is rolled up and update the context and
|
||||
// response accordingly
|
||||
MasterKeyData nextMasterKey =
|
||||
this.secretManager.getNextMasterKeyData();
|
||||
|
||||
if (nextMasterKey != null
|
||||
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
|
||||
.getKeyId()) {
|
||||
Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
|
||||
if (nextMasterKey.getMasterKey().getKeyId() != context
|
||||
.getLocalAMRMTokenKeyId()) {
|
||||
LOG.info("The local AMRMToken has been rolled-over."
|
||||
+ " Send new local AMRMToken back to application: "
|
||||
+ pipeline.getApplicationId());
|
||||
localToken =
|
||||
this.secretManager.createAndGetAMRMToken(pipeline
|
||||
.getApplicationAttemptId());
|
||||
context.setLocalAMRMToken(localToken);
|
||||
}
|
||||
|
||||
allocateResponse
|
||||
.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
|
||||
.newInstance(localToken.getIdentifier(), localToken
|
||||
.getKind().toString(), localToken.getPassword(),
|
||||
localToken.getService().toString()));
|
||||
}
|
||||
}
|
||||
|
||||
private AMRMProxyApplicationContext createApplicationMasterContext(
|
||||
ApplicationAttemptId applicationAttemptId, String user,
|
||||
Token<AMRMTokenIdentifier> amrmToken,
|
||||
Token<AMRMTokenIdentifier> localToken) {
|
||||
AMRMProxyApplicationContextImpl appContext =
|
||||
new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
|
||||
applicationAttemptId, user, amrmToken, localToken);
|
||||
return appContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the Request intercepter chains for all the applications.
|
||||
*
|
||||
* @return the request intercepter chains.
|
||||
*/
|
||||
protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
|
||||
return this.applPipelineMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method creates and returns reference of the first intercepter in the
|
||||
* chain of request intercepter instances.
|
||||
*
|
||||
* @return the reference of the first intercepter in the chain
|
||||
*/
|
||||
protected RequestInterceptor createRequestInterceptorChain() {
|
||||
Configuration conf = getConfig();
|
||||
|
||||
List<String> interceptorClassNames = getInterceptorClassNames(conf);
|
||||
|
||||
RequestInterceptor pipeline = null;
|
||||
RequestInterceptor current = null;
|
||||
for (String interceptorClassName : interceptorClassNames) {
|
||||
try {
|
||||
Class<?> interceptorClass =
|
||||
conf.getClassByName(interceptorClassName);
|
||||
if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) {
|
||||
RequestInterceptor interceptorInstance =
|
||||
(RequestInterceptor) ReflectionUtils.newInstance(
|
||||
interceptorClass, conf);
|
||||
if (pipeline == null) {
|
||||
pipeline = interceptorInstance;
|
||||
current = interceptorInstance;
|
||||
continue;
|
||||
} else {
|
||||
current.setNextInterceptor(interceptorInstance);
|
||||
current = interceptorInstance;
|
||||
}
|
||||
} else {
|
||||
throw new YarnRuntimeException("Class: " + interceptorClassName
|
||||
+ " not instance of "
|
||||
+ RequestInterceptor.class.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Could not instantiate ApplicationMasterRequestInterceptor: "
|
||||
+ interceptorClassName, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (pipeline == null) {
|
||||
throw new YarnRuntimeException(
|
||||
"RequestInterceptor pipeline is not configured in the system");
|
||||
}
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the comma separated intercepter class names from the configuration.
|
||||
*
|
||||
* @param conf
|
||||
* @return the intercepter class names as an instance of ArrayList
|
||||
*/
|
||||
private List<String> getInterceptorClassNames(Configuration conf) {
|
||||
String configuredInterceptorClassNames =
|
||||
conf.get(
|
||||
YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
|
||||
|
||||
List<String> interceptorClassNames = new ArrayList<String>();
|
||||
Collection<String> tempList =
|
||||
StringUtils.getStringCollection(configuredInterceptorClassNames);
|
||||
for (String item : tempList) {
|
||||
interceptorClassNames.add(item.trim());
|
||||
}
|
||||
|
||||
return interceptorClassNames;
|
||||
}
|
||||
|
||||
/**
|
||||
* Authorizes the request and returns the application specific request
|
||||
* processing pipeline.
|
||||
*
|
||||
* @return the the intercepter wrapper instance
|
||||
* @throws YarnException
|
||||
*/
|
||||
private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
|
||||
throws YarnException {
|
||||
AMRMTokenIdentifier tokenIdentifier =
|
||||
YarnServerSecurityUtils.authorizeRequest();
|
||||
return getInterceptorChain(tokenIdentifier);
|
||||
}
|
||||
|
||||
private RequestInterceptorChainWrapper getInterceptorChain(
|
||||
AMRMTokenIdentifier tokenIdentifier) throws YarnException {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
tokenIdentifier.getApplicationAttemptId();
|
||||
|
||||
synchronized (this.applPipelineMap) {
|
||||
if (!this.applPipelineMap.containsKey(appAttemptId
|
||||
.getApplicationId())) {
|
||||
throw new YarnException(
|
||||
"The AM request processing pipeline is not initialized for app: "
|
||||
+ appAttemptId.getApplicationId().toString());
|
||||
}
|
||||
|
||||
return this.applPipelineMap.get(appAttemptId.getApplicationId());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Token<AMRMTokenIdentifier> getFirstAMRMToken(
|
||||
Collection<Token<? extends TokenIdentifier>> allTokens) {
|
||||
Iterator<Token<? extends TokenIdentifier>> iter = allTokens.iterator();
|
||||
while (iter.hasNext()) {
|
||||
Token<? extends TokenIdentifier> token = iter.next();
|
||||
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||
return (Token<AMRMTokenIdentifier>) token;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Private class for handling application stop events.
|
||||
*
|
||||
*/
|
||||
class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
|
||||
|
||||
@Override
|
||||
public void handle(ApplicationEvent event) {
|
||||
Application app =
|
||||
AMRMProxyService.this.nmContext.getApplications().get(
|
||||
event.getApplicationID());
|
||||
if (app != null) {
|
||||
switch (event.getType()) {
|
||||
case FINISH_APPLICATION:
|
||||
LOG.info("Application stop event received for stopping AppId:"
|
||||
+ event.getApplicationID().toString());
|
||||
AMRMProxyService.this.stopApplication(event.getApplicationID());
|
||||
break;
|
||||
default:
|
||||
LOG.debug("AMRMProxy is ignoring event: " + event.getType());
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Event " + event + " sent to absent application "
|
||||
+ event.getApplicationID());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Private structure for encapsulating RequestInterceptor and
|
||||
* ApplicationAttemptId instances.
|
||||
*
|
||||
*/
|
||||
private static class RequestInterceptorChainWrapper {
|
||||
private RequestInterceptor rootInterceptor;
|
||||
private ApplicationAttemptId applicationAttemptId;
|
||||
|
||||
/**
|
||||
* Initializes the wrapper with the specified parameters.
|
||||
*
|
||||
* @param rootInterceptor
|
||||
* @param applicationAttemptId
|
||||
*/
|
||||
public synchronized void init(RequestInterceptor rootInterceptor,
|
||||
ApplicationAttemptId applicationAttemptId) {
|
||||
this.rootInterceptor = rootInterceptor;
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the root request intercepter.
|
||||
*
|
||||
* @return the root request intercepter
|
||||
*/
|
||||
public synchronized RequestInterceptor getRootInterceptor() {
|
||||
return rootInterceptor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the application attempt identifier.
|
||||
*
|
||||
* @return the application attempt identifier
|
||||
*/
|
||||
public synchronized ApplicationAttemptId getApplicationAttemptId() {
|
||||
return applicationAttemptId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the application identifier.
|
||||
*
|
||||
* @return the application identifier
|
||||
*/
|
||||
public synchronized ApplicationId getApplicationId() {
|
||||
return applicationAttemptId.getApplicationId();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,265 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This secret manager instance is used by the AMRMProxyService to generate and
|
||||
* manage tokens.
|
||||
*/
|
||||
public class AMRMProxyTokenSecretManager extends
|
||||
SecretManager<AMRMTokenIdentifier> {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(AMRMProxyTokenSecretManager.class);
|
||||
|
||||
private int serialNo = new SecureRandom().nextInt();
|
||||
private MasterKeyData nextMasterKey;
|
||||
private MasterKeyData currentMasterKey;
|
||||
|
||||
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
private final Lock readLock = readWriteLock.readLock();
|
||||
private final Lock writeLock = readWriteLock.writeLock();
|
||||
|
||||
private final Timer timer;
|
||||
private final long rollingInterval;
|
||||
private final long activationDelay;
|
||||
|
||||
private final Set<ApplicationAttemptId> appAttemptSet =
|
||||
new HashSet<ApplicationAttemptId>();
|
||||
|
||||
/**
|
||||
* Create an {@link AMRMProxyTokenSecretManager}.
|
||||
*/
|
||||
public AMRMProxyTokenSecretManager(Configuration conf) {
|
||||
this.timer = new Timer();
|
||||
this.rollingInterval =
|
||||
conf.getLong(
|
||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||
YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
|
||||
// Adding delay = 1.5 * expiry interval makes sure that all active AMs get
|
||||
// the updated shared-key.
|
||||
this.activationDelay =
|
||||
(long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
|
||||
LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
|
||||
+ "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay
|
||||
+ " ms");
|
||||
if (rollingInterval <= activationDelay * 2) {
|
||||
throw new IllegalArgumentException(
|
||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
|
||||
+ " should be more than 3 X "
|
||||
+ YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
if (this.currentMasterKey == null) {
|
||||
this.currentMasterKey = createNewMasterKey();
|
||||
}
|
||||
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
|
||||
rollingInterval);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
this.timer.cancel();
|
||||
}
|
||||
|
||||
public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Application finished, removing password for "
|
||||
+ appAttemptId);
|
||||
this.appAttemptSet.remove(appAttemptId);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private class MasterKeyRoller extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
rollMasterKey();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
void rollMasterKey() {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Rolling master-key for amrm-tokens");
|
||||
this.nextMasterKey = createNewMasterKey();
|
||||
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private class NextKeyActivator extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
activateNextMasterKey();
|
||||
}
|
||||
}
|
||||
|
||||
public void activateNextMasterKey() {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Activating next master key with id: "
|
||||
+ this.nextMasterKey.getMasterKey().getKeyId());
|
||||
this.currentMasterKey = this.nextMasterKey;
|
||||
this.nextMasterKey = null;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public MasterKeyData createNewMasterKey() {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
return new MasterKeyData(serialNo++, generateSecret());
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
|
||||
ApplicationAttemptId appAttemptId) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
|
||||
AMRMTokenIdentifier identifier =
|
||||
new AMRMTokenIdentifier(appAttemptId, getMasterKey()
|
||||
.getMasterKey().getKeyId());
|
||||
byte[] password = this.createPassword(identifier);
|
||||
appAttemptSet.add(appAttemptId);
|
||||
return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
|
||||
password, identifier.getKind(), new Text());
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// If nextMasterKey is not Null, then return nextMasterKey
|
||||
// otherwise return currentMasterKey.
|
||||
@VisibleForTesting
|
||||
public MasterKeyData getMasterKey() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return nextMasterKey == null ? currentMasterKey : nextMasterKey;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by
|
||||
* RPC layer to validate a remote {@link AMRMTokenIdentifier}.
|
||||
*/
|
||||
@Override
|
||||
public byte[] retrievePassword(AMRMTokenIdentifier identifier)
|
||||
throws InvalidToken {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
identifier.getApplicationAttemptId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Trying to retrieve password for "
|
||||
+ applicationAttemptId);
|
||||
}
|
||||
if (!appAttemptSet.contains(applicationAttemptId)) {
|
||||
throw new InvalidToken(applicationAttemptId
|
||||
+ " not found in AMRMProxyTokenSecretManager.");
|
||||
}
|
||||
if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
|
||||
.getKeyId()) {
|
||||
return createPassword(identifier.getBytes(),
|
||||
this.currentMasterKey.getSecretKey());
|
||||
} else if (nextMasterKey != null
|
||||
&& identifier.getKeyId() == this.nextMasterKey.getMasterKey()
|
||||
.getKeyId()) {
|
||||
return createPassword(identifier.getBytes(),
|
||||
this.nextMasterKey.getSecretKey());
|
||||
}
|
||||
throw new InvalidToken("Invalid AMRMToken from "
|
||||
+ applicationAttemptId);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an empty TokenId to be used for de-serializing an
|
||||
* {@link AMRMTokenIdentifier} by the RPC layer.
|
||||
*/
|
||||
@Override
|
||||
public AMRMTokenIdentifier createIdentifier() {
|
||||
return new AMRMTokenIdentifier();
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public MasterKeyData getNextMasterKeyData() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.nextMasterKey;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Private
|
||||
protected byte[] createPassword(AMRMTokenIdentifier identifier) {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
identifier.getApplicationAttemptId();
|
||||
LOG.info("Creating password for " + applicationAttemptId);
|
||||
return createPassword(identifier.getBytes(), getMasterKey()
|
||||
.getSecretKey());
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Implements the RequestInterceptor interface and provides common functionality
|
||||
* which can can be used and/or extended by other concrete intercepter classes.
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractRequestInterceptor implements
|
||||
RequestInterceptor {
|
||||
private Configuration conf;
|
||||
private AMRMProxyApplicationContext appContext;
|
||||
private RequestInterceptor nextInterceptor;
|
||||
|
||||
/**
|
||||
* Sets the {@link RequestInterceptor} in the chain.
|
||||
*/
|
||||
@Override
|
||||
public void setNextInterceptor(RequestInterceptor nextInterceptor) {
|
||||
this.nextInterceptor = nextInterceptor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link Configuration}.
|
||||
*/
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
if (this.nextInterceptor != null) {
|
||||
this.nextInterceptor.setConf(conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the {@link Configuration}.
|
||||
*/
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the {@link RequestInterceptor}.
|
||||
*/
|
||||
@Override
|
||||
public void init(AMRMProxyApplicationContext appContext) {
|
||||
Preconditions.checkState(this.appContext == null,
|
||||
"init is called multiple times on this interceptor: "
|
||||
+ this.getClass().getName());
|
||||
this.appContext = appContext;
|
||||
if (this.nextInterceptor != null) {
|
||||
this.nextInterceptor.init(appContext);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disposes the {@link RequestInterceptor}.
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (this.nextInterceptor != null) {
|
||||
this.nextInterceptor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the next {@link RequestInterceptor} in the chain.
|
||||
*/
|
||||
@Override
|
||||
public RequestInterceptor getNextInterceptor() {
|
||||
return this.nextInterceptor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the {@link AMRMProxyApplicationContext}.
|
||||
*/
|
||||
public AMRMProxyApplicationContext getApplicationContext() {
|
||||
return this.appContext;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Extends the AbstractRequestInterceptor class and provides an implementation
|
||||
* that simply forwards the AM requests to the cluster resource manager.
|
||||
*
|
||||
*/
|
||||
public final class DefaultRequestInterceptor extends
|
||||
AbstractRequestInterceptor {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(DefaultRequestInterceptor.class);
|
||||
private ApplicationMasterProtocol rmClient;
|
||||
private UserGroupInformation user = null;
|
||||
|
||||
@Override
|
||||
public void init(AMRMProxyApplicationContext appContext) {
|
||||
super.init(appContext);
|
||||
try {
|
||||
user =
|
||||
UserGroupInformation.createProxyUser(appContext
|
||||
.getApplicationAttemptId().toString(), UserGroupInformation
|
||||
.getCurrentUser());
|
||||
user.addToken(appContext.getAMRMToken());
|
||||
final Configuration conf = this.getConf();
|
||||
|
||||
rmClient =
|
||||
user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
|
||||
@Override
|
||||
public ApplicationMasterProtocol run() throws Exception {
|
||||
return ClientRMProxy.createRMProxy(conf,
|
||||
ApplicationMasterProtocol.class);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
String message =
|
||||
"Error while creating of RM app master service proxy for attemptId:"
|
||||
+ appContext.getApplicationAttemptId().toString();
|
||||
if (user != null) {
|
||||
message += ", user: " + user;
|
||||
}
|
||||
|
||||
LOG.info(message);
|
||||
throw new YarnRuntimeException(message, e);
|
||||
} catch (Exception e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
final RegisterApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
LOG.info("Forwarding registration request to the real YARN RM");
|
||||
return rmClient.registerApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(final AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Forwarding allocate request to the real YARN RM");
|
||||
}
|
||||
AllocateResponse allocateResponse = rmClient.allocate(request);
|
||||
if (allocateResponse.getAMRMToken() != null) {
|
||||
updateAMRMToken(allocateResponse.getAMRMToken());
|
||||
}
|
||||
|
||||
return allocateResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
final FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
LOG.info("Forwarding finish application request to "
|
||||
+ "the real YARN Resource Manager");
|
||||
return rmClient.finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextInterceptor(RequestInterceptor next) {
|
||||
throw new YarnRuntimeException(
|
||||
"setNextInterceptor is being called on DefaultRequestInterceptor,"
|
||||
+ "which should be the last one in the chain "
|
||||
+ "Check if the interceptor pipeline configuration is correct");
|
||||
}
|
||||
|
||||
private void updateAMRMToken(Token token) throws IOException {
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
|
||||
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
|
||||
token.getIdentifier().array(), token.getPassword().array(),
|
||||
new Text(token.getKind()), new Text(token.getService()));
|
||||
// Preserve the token service sent by the RM when adding the token
|
||||
// to ensure we replace the previous token setup by the RM.
|
||||
// Afterwards we can update the service address for the RPC layer.
|
||||
user.addToken(amrmToken);
|
||||
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
|
||||
/**
|
||||
* Defines the contract to be implemented by the request intercepter classes,
|
||||
* that can be used to intercept and inspect messages sent from the application
|
||||
* master to the resource manager.
|
||||
*/
|
||||
public interface RequestInterceptor extends ApplicationMasterProtocol,
|
||||
Configurable {
|
||||
/**
|
||||
* This method is called for initializing the intercepter. This is guaranteed
|
||||
* to be called only once in the lifetime of this instance.
|
||||
*
|
||||
* @param ctx
|
||||
*/
|
||||
void init(AMRMProxyApplicationContext ctx);
|
||||
|
||||
/**
|
||||
* This method is called to release the resources held by the intercepter.
|
||||
* This will be called when the application pipeline is being destroyed. The
|
||||
* concrete implementations should dispose the resources and forward the
|
||||
* request to the next intercepter, if any.
|
||||
*/
|
||||
void shutdown();
|
||||
|
||||
/**
|
||||
* Sets the next intercepter in the pipeline. The concrete implementation of
|
||||
* this interface should always pass the request to the nextInterceptor after
|
||||
* inspecting the message. The last intercepter in the chain is responsible to
|
||||
* send the messages to the resource manager service and so the last
|
||||
* intercepter will not receive this method call.
|
||||
*
|
||||
* @param nextInterceptor
|
||||
*/
|
||||
void setNextInterceptor(RequestInterceptor nextInterceptor);
|
||||
|
||||
/**
|
||||
* Returns the next intercepter in the chain.
|
||||
*
|
||||
* @return the next intercepter in the chain
|
||||
*/
|
||||
RequestInterceptor getNextInterceptor();
|
||||
|
||||
/**
|
||||
* Returns the context.
|
||||
*
|
||||
* @return the context
|
||||
*/
|
||||
AMRMProxyApplicationContext getApplicationContext();
|
||||
}
|
|
@ -42,7 +42,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -51,7 +50,6 @@ import org.apache.hadoop.security.SaslRpcServer;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.service.Service;
|
||||
|
@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
|
@ -103,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
|
@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
|
|||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
@ -172,6 +173,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
private boolean serviceStopped = false;
|
||||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
private AMRMProxyService amrmProxyService;
|
||||
private boolean amrmProxyEnabled = false;
|
||||
|
||||
private long waitForContainersOnShutdownMillis;
|
||||
|
||||
|
@ -235,6 +238,20 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
addService(sharedCacheUploader);
|
||||
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
|
||||
|
||||
amrmProxyEnabled =
|
||||
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
|
||||
|
||||
if (amrmProxyEnabled) {
|
||||
LOG.info("AMRMProxyService is enabled. "
|
||||
+ "All the AM->RM requests will be intercepted by the proxy");
|
||||
this.amrmProxyService =
|
||||
new AMRMProxyService(this.context, this.dispatcher);
|
||||
addService(this.amrmProxyService);
|
||||
} else {
|
||||
LOG.info("AMRMProxyService is disabled");
|
||||
}
|
||||
|
||||
waitForContainersOnShutdownMillis =
|
||||
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
|
||||
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
|
||||
|
@ -246,6 +263,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
recover();
|
||||
}
|
||||
|
||||
public boolean isARMRMProxyEnabled() {
|
||||
return amrmProxyEnabled;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void recover() throws IOException, URISyntaxException {
|
||||
NMStateStoreService stateStore = context.getNMStateStore();
|
||||
|
@ -314,7 +335,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
+ " with exit code " + rcs.getExitCode());
|
||||
|
||||
if (context.getApplications().containsKey(appId)) {
|
||||
Credentials credentials = parseCredentials(launchContext);
|
||||
Credentials credentials =
|
||||
YarnServerSecurityUtils.parseCredentials(launchContext);
|
||||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||
context.getNMStateStore(), req.getContainerLaunchContext(),
|
||||
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
|
||||
|
@ -737,8 +759,17 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
|
||||
containerTokenIdentifier);
|
||||
containerId = containerTokenIdentifier.getContainerID();
|
||||
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
|
||||
request);
|
||||
|
||||
// Initialize the AMRMProxy service instance only if the container is of
|
||||
// type AM and if the AMRMProxy service is enabled
|
||||
if (isARMRMProxyEnabled()
|
||||
&& containerTokenIdentifier.getContainerType().equals(
|
||||
ContainerType.APPLICATION_MASTER)) {
|
||||
this.amrmProxyService.processApplicationStartRequest(request);
|
||||
}
|
||||
|
||||
startContainerInternal(nmTokenIdentifier,
|
||||
containerTokenIdentifier, request);
|
||||
succeededContainers.add(containerId);
|
||||
} catch (YarnException e) {
|
||||
failedContainers.put(containerId, SerializedException.newInstance(e));
|
||||
|
@ -751,7 +782,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
|
||||
return StartContainersResponse.newInstance(getAuxServiceMetaData(),
|
||||
succeededContainers, failedContainers);
|
||||
succeededContainers, failedContainers);
|
||||
}
|
||||
|
||||
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
|
||||
|
@ -844,7 +875,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
Credentials credentials = parseCredentials(launchContext);
|
||||
Credentials credentials =
|
||||
YarnServerSecurityUtils.parseCredentials(launchContext);
|
||||
|
||||
Container container =
|
||||
new ContainerImpl(getConfig(), this.dispatcher,
|
||||
|
@ -928,27 +960,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
nmTokenIdentifier);
|
||||
}
|
||||
|
||||
private Credentials parseCredentials(ContainerLaunchContext launchContext)
|
||||
throws IOException {
|
||||
Credentials credentials = new Credentials();
|
||||
// //////////// Parse credentials
|
||||
ByteBuffer tokens = launchContext.getTokens();
|
||||
|
||||
if (tokens != null) {
|
||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||
tokens.rewind();
|
||||
buf.reset(tokens);
|
||||
credentials.readTokenStorageStream(buf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
|
||||
LOG.debug(tk.getService() + " = " + tk.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
// //////////// End of parsing credentials
|
||||
return credentials;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop a list of containers running on this NodeManager.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,677 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Base class for all the AMRMProxyService test cases. It provides utility
|
||||
* methods that can be used by the concrete test case classes
|
||||
*
|
||||
*/
|
||||
public abstract class BaseAMRMProxyTest {
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(BaseAMRMProxyTest.class);
|
||||
/**
|
||||
* The AMRMProxyService instance that will be used by all the test cases
|
||||
*/
|
||||
private MockAMRMProxyService amrmProxyService;
|
||||
/**
|
||||
* Thread pool used for asynchronous operations
|
||||
*/
|
||||
private static ExecutorService threadpool = Executors
|
||||
.newCachedThreadPool();
|
||||
private Configuration conf;
|
||||
private AsyncDispatcher dispatcher;
|
||||
|
||||
protected MockAMRMProxyService getAMRMProxyService() {
|
||||
Assert.assertNotNull(this.amrmProxyService);
|
||||
return this.amrmProxyService;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
this.conf = new YarnConfiguration();
|
||||
this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
|
||||
String mockPassThroughInterceptorClass =
|
||||
PassThroughRequestInterceptor.class.getName();
|
||||
|
||||
// Create a request intercepter pipeline for testing. The last one in the
|
||||
// chain will call the mock resource manager. The others in the chain will
|
||||
// simply forward it to the next one in the chain
|
||||
this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
|
||||
mockPassThroughInterceptorClass + ","
|
||||
+ mockPassThroughInterceptorClass + ","
|
||||
+ mockPassThroughInterceptorClass + ","
|
||||
+ MockRequestInterceptor.class.getName());
|
||||
|
||||
this.dispatcher = new AsyncDispatcher();
|
||||
this.dispatcher.init(conf);
|
||||
this.dispatcher.start();
|
||||
this.amrmProxyService = createAndStartAMRMProxyService();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
amrmProxyService.stop();
|
||||
amrmProxyService = null;
|
||||
this.dispatcher.stop();
|
||||
}
|
||||
|
||||
protected ExecutorService getThreadPool() {
|
||||
return threadpool;
|
||||
}
|
||||
|
||||
protected MockAMRMProxyService createAndStartAMRMProxyService() {
|
||||
MockAMRMProxyService svc =
|
||||
new MockAMRMProxyService(new NullContext(), dispatcher);
|
||||
svc.init(conf);
|
||||
svc.start();
|
||||
return svc;
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper method will invoke the specified function in parallel for each
|
||||
* end point in the specified list using a thread pool and return the
|
||||
* responses received from the function. It implements the logic required for
|
||||
* dispatching requests in parallel and waiting for the responses. If any of
|
||||
* the function call fails or times out, it will ignore and proceed with the
|
||||
* rest. So the responses returned can be less than the number of end points
|
||||
* specified
|
||||
*
|
||||
* @param testContext
|
||||
* @param func
|
||||
* @return
|
||||
*/
|
||||
protected <T, R> List<R> runInParallel(List<T> testContexts,
|
||||
final Function<T, R> func) {
|
||||
ExecutorCompletionService<R> completionService =
|
||||
new ExecutorCompletionService<R>(this.getThreadPool());
|
||||
LOG.info("Sending requests to endpoints asynchronously. Number of test contexts="
|
||||
+ testContexts.size());
|
||||
for (int index = 0; index < testContexts.size(); index++) {
|
||||
final T testContext = testContexts.get(index);
|
||||
|
||||
LOG.info("Adding request to threadpool for test context: "
|
||||
+ testContext.toString());
|
||||
|
||||
completionService.submit(new Callable<R>() {
|
||||
@Override
|
||||
public R call() throws Exception {
|
||||
LOG.info("Sending request. Test context:"
|
||||
+ testContext.toString());
|
||||
|
||||
R response = null;
|
||||
try {
|
||||
response = func.invoke(testContext);
|
||||
LOG.info("Successfully sent request for context: "
|
||||
+ testContext.toString());
|
||||
} catch (Throwable ex) {
|
||||
LOG.error("Failed to process request for context: "
|
||||
+ testContext);
|
||||
response = null;
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ArrayList<R> responseList = new ArrayList<R>();
|
||||
LOG.info("Waiting for responses from endpoints. Number of contexts="
|
||||
+ testContexts.size());
|
||||
for (int i = 0; i < testContexts.size(); ++i) {
|
||||
try {
|
||||
final Future<R> future = completionService.take();
|
||||
final R response = future.get(3000, TimeUnit.MILLISECONDS);
|
||||
responseList.add(response);
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Failed to process request " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
return responseList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to register an application master using specified testAppId
|
||||
* as the application identifier and return the response
|
||||
*
|
||||
* @param testAppId
|
||||
* @return
|
||||
* @throws Exception
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
protected RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
final int testAppId) throws Exception, YarnException, IOException {
|
||||
final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
|
||||
|
||||
return ugi
|
||||
.getUser()
|
||||
.doAs(
|
||||
new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse run()
|
||||
throws Exception {
|
||||
getAMRMProxyService().initApp(
|
||||
ugi.getAppAttemptId(),
|
||||
ugi.getUser().getUserName());
|
||||
|
||||
final RegisterApplicationMasterRequest req =
|
||||
Records
|
||||
.newRecord(RegisterApplicationMasterRequest.class);
|
||||
req.setHost(Integer.toString(testAppId));
|
||||
req.setRpcPort(testAppId);
|
||||
req.setTrackingUrl("");
|
||||
|
||||
RegisterApplicationMasterResponse response =
|
||||
getAMRMProxyService().registerApplicationMaster(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that can be used to register multiple application masters in
|
||||
* parallel to the specified RM end points
|
||||
*
|
||||
* @param testContexts - used to identify the requests
|
||||
* @return
|
||||
*/
|
||||
protected <T> List<RegisterApplicationMasterResponseInfo<T>> registerApplicationMastersInParallel(
|
||||
final ArrayList<T> testContexts) {
|
||||
List<RegisterApplicationMasterResponseInfo<T>> responses =
|
||||
runInParallel(testContexts,
|
||||
new Function<T, RegisterApplicationMasterResponseInfo<T>>() {
|
||||
@Override
|
||||
public RegisterApplicationMasterResponseInfo<T> invoke(
|
||||
T testContext) {
|
||||
RegisterApplicationMasterResponseInfo<T> response = null;
|
||||
try {
|
||||
int index = testContexts.indexOf(testContext);
|
||||
response =
|
||||
new RegisterApplicationMasterResponseInfo<T>(
|
||||
registerApplicationMaster(index), testContext);
|
||||
Assert.assertNotNull(response.getResponse());
|
||||
Assert.assertEquals(Integer.toString(index), response
|
||||
.getResponse().getQueue());
|
||||
|
||||
LOG.info("Sucessfully registered application master with test context: "
|
||||
+ testContext);
|
||||
} catch (Throwable ex) {
|
||||
response = null;
|
||||
LOG.error("Failed to register application master with test context: "
|
||||
+ testContext);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertEquals(
|
||||
"Number of responses received does not match with request",
|
||||
testContexts.size(), responses.size());
|
||||
|
||||
Set<T> contextResponses = new TreeSet<T>();
|
||||
for (RegisterApplicationMasterResponseInfo<T> item : responses) {
|
||||
contextResponses.add(item.getTestContext());
|
||||
}
|
||||
|
||||
for (T ep : testContexts) {
|
||||
Assert.assertTrue(contextResponses.contains(ep));
|
||||
}
|
||||
|
||||
return responses;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregisters the application master for specified application id
|
||||
*
|
||||
* @param appId
|
||||
* @param status
|
||||
* @return
|
||||
* @throws Exception
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
protected FinishApplicationMasterResponse finishApplicationMaster(
|
||||
final int appId, final FinalApplicationStatus status)
|
||||
throws Exception, YarnException, IOException {
|
||||
|
||||
final ApplicationUserInfo ugi = getApplicationUserInfo(appId);
|
||||
|
||||
return ugi.getUser().doAs(
|
||||
new PrivilegedExceptionAction<FinishApplicationMasterResponse>() {
|
||||
@Override
|
||||
public FinishApplicationMasterResponse run() throws Exception {
|
||||
final FinishApplicationMasterRequest req =
|
||||
Records.newRecord(FinishApplicationMasterRequest.class);
|
||||
req.setDiagnostics("");
|
||||
req.setTrackingUrl("");
|
||||
req.setFinalApplicationStatus(status);
|
||||
|
||||
FinishApplicationMasterResponse response =
|
||||
getAMRMProxyService().finishApplicationMaster(req);
|
||||
|
||||
getAMRMProxyService().stopApp(
|
||||
ugi.getAppAttemptId().getApplicationId());
|
||||
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <T> List<FinishApplicationMasterResponseInfo<T>> finishApplicationMastersInParallel(
|
||||
final ArrayList<T> testContexts) {
|
||||
List<FinishApplicationMasterResponseInfo<T>> responses =
|
||||
runInParallel(testContexts,
|
||||
new Function<T, FinishApplicationMasterResponseInfo<T>>() {
|
||||
@Override
|
||||
public FinishApplicationMasterResponseInfo<T> invoke(
|
||||
T testContext) {
|
||||
FinishApplicationMasterResponseInfo<T> response = null;
|
||||
try {
|
||||
response =
|
||||
new FinishApplicationMasterResponseInfo<T>(
|
||||
finishApplicationMaster(
|
||||
testContexts.indexOf(testContext),
|
||||
FinalApplicationStatus.SUCCEEDED),
|
||||
testContext);
|
||||
Assert.assertNotNull(response.getResponse());
|
||||
|
||||
LOG.info("Sucessfully finished application master with test contexts: "
|
||||
+ testContext);
|
||||
} catch (Throwable ex) {
|
||||
response = null;
|
||||
LOG.error("Failed to finish application master with test context: "
|
||||
+ testContext);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertEquals(
|
||||
"Number of responses received does not match with request",
|
||||
testContexts.size(), responses.size());
|
||||
|
||||
Set<T> contextResponses = new TreeSet<T>();
|
||||
for (FinishApplicationMasterResponseInfo<T> item : responses) {
|
||||
Assert.assertNotNull(item);
|
||||
Assert.assertNotNull(item.getResponse());
|
||||
contextResponses.add(item.getTestContext());
|
||||
}
|
||||
|
||||
for (T ep : testContexts) {
|
||||
Assert.assertTrue(contextResponses.contains(ep));
|
||||
}
|
||||
|
||||
return responses;
|
||||
}
|
||||
|
||||
protected AllocateResponse allocate(final int testAppId)
|
||||
throws Exception, YarnException, IOException {
|
||||
final AllocateRequest req = Records.newRecord(AllocateRequest.class);
|
||||
req.setResponseId(testAppId);
|
||||
return allocate(testAppId, req);
|
||||
}
|
||||
|
||||
protected AllocateResponse allocate(final int testAppId,
|
||||
final AllocateRequest request) throws Exception, YarnException,
|
||||
IOException {
|
||||
|
||||
final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
|
||||
|
||||
return ugi.getUser().doAs(
|
||||
new PrivilegedExceptionAction<AllocateResponse>() {
|
||||
@Override
|
||||
public AllocateResponse run() throws Exception {
|
||||
AllocateResponse response =
|
||||
getAMRMProxyService().allocate(request);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) {
|
||||
final ApplicationAttemptId attemptId =
|
||||
getApplicationAttemptId(testAppId);
|
||||
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||
AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1);
|
||||
ugi.addTokenIdentifier(token);
|
||||
return new ApplicationUserInfo(ugi, attemptId);
|
||||
}
|
||||
|
||||
protected List<ResourceRequest> createResourceRequests(String[] hosts,
|
||||
int memory, int vCores, int priority, int containers)
|
||||
throws Exception {
|
||||
return createResourceRequests(hosts, memory, vCores, priority,
|
||||
containers, null);
|
||||
}
|
||||
|
||||
protected List<ResourceRequest> createResourceRequests(String[] hosts,
|
||||
int memory, int vCores, int priority, int containers,
|
||||
String labelExpression) throws Exception {
|
||||
List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
|
||||
for (String host : hosts) {
|
||||
ResourceRequest hostReq =
|
||||
createResourceRequest(host, memory, vCores, priority,
|
||||
containers, labelExpression);
|
||||
reqs.add(hostReq);
|
||||
ResourceRequest rackReq =
|
||||
createResourceRequest("/default-rack", memory, vCores, priority,
|
||||
containers, labelExpression);
|
||||
reqs.add(rackReq);
|
||||
}
|
||||
|
||||
ResourceRequest offRackReq =
|
||||
createResourceRequest(ResourceRequest.ANY, memory, vCores,
|
||||
priority, containers, labelExpression);
|
||||
reqs.add(offRackReq);
|
||||
return reqs;
|
||||
}
|
||||
|
||||
protected ResourceRequest createResourceRequest(String resource,
|
||||
int memory, int vCores, int priority, int containers)
|
||||
throws Exception {
|
||||
return createResourceRequest(resource, memory, vCores, priority,
|
||||
containers, null);
|
||||
}
|
||||
|
||||
protected ResourceRequest createResourceRequest(String resource,
|
||||
int memory, int vCores, int priority, int containers,
|
||||
String labelExpression) throws Exception {
|
||||
ResourceRequest req = Records.newRecord(ResourceRequest.class);
|
||||
req.setResourceName(resource);
|
||||
req.setNumContainers(containers);
|
||||
Priority pri = Records.newRecord(Priority.class);
|
||||
pri.setPriority(priority);
|
||||
req.setPriority(pri);
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(memory);
|
||||
capability.setVirtualCores(vCores);
|
||||
req.setCapability(capability);
|
||||
if (labelExpression != null) {
|
||||
req.setNodeLabelExpression(labelExpression);
|
||||
}
|
||||
return req;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an ApplicationId with the specified identifier
|
||||
*
|
||||
* @param testAppId
|
||||
* @return
|
||||
*/
|
||||
protected ApplicationId getApplicationId(int testAppId) {
|
||||
return ApplicationId.newInstance(123456, testAppId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an instance of ApplicationAttemptId using specified identifier. This
|
||||
* identifier will be used for the ApplicationId too.
|
||||
*
|
||||
* @param testAppId
|
||||
* @return
|
||||
*/
|
||||
protected ApplicationAttemptId getApplicationAttemptId(int testAppId) {
|
||||
return ApplicationAttemptId.newInstance(getApplicationId(testAppId),
|
||||
testAppId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an instance of ApplicationAttemptId using specified identifier and
|
||||
* application id
|
||||
*
|
||||
* @param testAppId
|
||||
* @return
|
||||
*/
|
||||
protected ApplicationAttemptId getApplicationAttemptId(int testAppId,
|
||||
ApplicationId appId) {
|
||||
return ApplicationAttemptId.newInstance(appId, testAppId);
|
||||
}
|
||||
|
||||
protected static class RegisterApplicationMasterResponseInfo<T> {
|
||||
private RegisterApplicationMasterResponse response;
|
||||
private T testContext;
|
||||
|
||||
RegisterApplicationMasterResponseInfo(
|
||||
RegisterApplicationMasterResponse response, T testContext) {
|
||||
this.response = response;
|
||||
this.testContext = testContext;
|
||||
}
|
||||
|
||||
public RegisterApplicationMasterResponse getResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
public T getTestContext() {
|
||||
return testContext;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class FinishApplicationMasterResponseInfo<T> {
|
||||
private FinishApplicationMasterResponse response;
|
||||
private T testContext;
|
||||
|
||||
FinishApplicationMasterResponseInfo(
|
||||
FinishApplicationMasterResponse response, T testContext) {
|
||||
this.response = response;
|
||||
this.testContext = testContext;
|
||||
}
|
||||
|
||||
public FinishApplicationMasterResponse getResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
public T getTestContext() {
|
||||
return testContext;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class ApplicationUserInfo {
|
||||
private UserGroupInformation user;
|
||||
private ApplicationAttemptId attemptId;
|
||||
|
||||
ApplicationUserInfo(UserGroupInformation user,
|
||||
ApplicationAttemptId attemptId) {
|
||||
this.user = user;
|
||||
this.attemptId = attemptId;
|
||||
}
|
||||
|
||||
public UserGroupInformation getUser() {
|
||||
return this.user;
|
||||
}
|
||||
|
||||
public ApplicationAttemptId getAppAttemptId() {
|
||||
return this.attemptId;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class MockAMRMProxyService extends AMRMProxyService {
|
||||
public MockAMRMProxyService(Context nmContext,
|
||||
AsyncDispatcher dispatcher) {
|
||||
super(nmContext, dispatcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used by the test code to initialize the pipeline. In the
|
||||
* actual service, the initialization is called by the
|
||||
* ContainerManagerImpl::StartContainers method
|
||||
*
|
||||
* @param applicationId
|
||||
* @param user
|
||||
*/
|
||||
public void initApp(ApplicationAttemptId applicationId, String user) {
|
||||
super.initializePipeline(applicationId, user, null, null);
|
||||
}
|
||||
|
||||
public void stopApp(ApplicationId applicationId) {
|
||||
super.stopApplication(applicationId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The Function interface is used for passing method pointers that can be
|
||||
* invoked asynchronously at a later point.
|
||||
*/
|
||||
protected interface Function<T, R> {
|
||||
public R invoke(T input);
|
||||
}
|
||||
|
||||
protected class NullContext implements Context {
|
||||
|
||||
@Override
|
||||
public NodeId getNodeId() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHttpPort() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, Application> getApplications() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<ContainerId, Container> getContainers() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMTokenSecretManagerInNM getNMTokenSecretManager() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeHealthStatus getNodeHealthStatus() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerManagementProtocol getContainerManager() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalDirsHandlerService getLocalDirsHandler() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationACLsManager getApplicationACLsManager() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMStateStoreService getNMStateStore() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getDecommissioned() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDecommissioned(boolean isDecommissioned) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentLinkedQueue<LogAggregationReport> getLogAggregationStatusForApps() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeResourceMonitor getNodeResourceMonitor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
public class MockRequestInterceptor extends AbstractRequestInterceptor {
|
||||
|
||||
private MockResourceManagerFacade mockRM;
|
||||
|
||||
public MockRequestInterceptor() {
|
||||
}
|
||||
|
||||
public void init(AMRMProxyApplicationContext appContext) {
|
||||
super.init(appContext);
|
||||
mockRM =
|
||||
new MockResourceManagerFacade(new YarnConfiguration(
|
||||
super.getConf()), 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return mockRM.registerApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return mockRM.finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
return mockRM.allocate(request);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,469 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.directory.api.util.Strings;
|
||||
import org.apache.directory.api.util.exception.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
/**
|
||||
* Mock Resource Manager facade implementation that exposes all the methods
|
||||
* implemented by the YARN RM. The behavior and the values returned by this mock
|
||||
* implementation is expected by the unit test cases. So please change the
|
||||
* implementation with care.
|
||||
*/
|
||||
public class MockResourceManagerFacade implements
|
||||
ApplicationMasterProtocol, ApplicationClientProtocol {
|
||||
|
||||
private HashMap<String, List<ContainerId>> applicationContainerIdMap =
|
||||
new HashMap<String, List<ContainerId>>();
|
||||
private HashMap<ContainerId, Container> allocatedContainerMap =
|
||||
new HashMap<ContainerId, Container>();
|
||||
private AtomicInteger containerIndex = new AtomicInteger(0);
|
||||
private Configuration conf;
|
||||
|
||||
public MockResourceManagerFacade(Configuration conf,
|
||||
int startContainerIndex) {
|
||||
this.conf = conf;
|
||||
this.containerIndex.set(startContainerIndex);
|
||||
}
|
||||
|
||||
private static String getAppIdentifier() throws IOException {
|
||||
AMRMTokenIdentifier result = null;
|
||||
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
|
||||
Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
|
||||
for (TokenIdentifier tokenId : tokenIds) {
|
||||
if (tokenId instanceof AMRMTokenIdentifier) {
|
||||
result = (AMRMTokenIdentifier) tokenId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result != null ? result.getApplicationAttemptId().toString()
|
||||
: "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
String amrmToken = getAppIdentifier();
|
||||
Log.info("Registering application attempt: " + amrmToken);
|
||||
|
||||
synchronized (applicationContainerIdMap) {
|
||||
Assert.assertFalse("The application id is already registered: "
|
||||
+ amrmToken, applicationContainerIdMap.containsKey(amrmToken));
|
||||
// Keep track of the containers that are returned to this application
|
||||
applicationContainerIdMap.put(amrmToken,
|
||||
new ArrayList<ContainerId>());
|
||||
}
|
||||
|
||||
return RegisterApplicationMasterResponse.newInstance(null, null, null,
|
||||
null, null, request.getHost(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
String amrmToken = getAppIdentifier();
|
||||
Log.info("Finishing application attempt: " + amrmToken);
|
||||
|
||||
synchronized (applicationContainerIdMap) {
|
||||
// Remove the containers that were being tracked for this application
|
||||
Assert.assertTrue("The application id is NOT registered: "
|
||||
+ amrmToken, applicationContainerIdMap.containsKey(amrmToken));
|
||||
List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
|
||||
for (ContainerId c : ids) {
|
||||
allocatedContainerMap.remove(c);
|
||||
}
|
||||
}
|
||||
|
||||
return FinishApplicationMasterResponse
|
||||
.newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true
|
||||
: false);
|
||||
}
|
||||
|
||||
protected ApplicationId getApplicationId(int id) {
|
||||
return ApplicationId.newInstance(12345, id);
|
||||
}
|
||||
|
||||
protected ApplicationAttemptId getApplicationAttemptId(int id) {
|
||||
return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
if (request.getAskList() != null && request.getAskList().size() > 0
|
||||
&& request.getReleaseList() != null
|
||||
&& request.getReleaseList().size() > 0) {
|
||||
Assert.fail("The mock RM implementation does not support receiving "
|
||||
+ "askList and releaseList in the same heartbeat");
|
||||
}
|
||||
|
||||
String amrmToken = getAppIdentifier();
|
||||
|
||||
ArrayList<Container> containerList = new ArrayList<Container>();
|
||||
if (request.getAskList() != null) {
|
||||
for (ResourceRequest rr : request.getAskList()) {
|
||||
for (int i = 0; i < rr.getNumContainers(); i++) {
|
||||
ContainerId containerId =
|
||||
ContainerId.newInstance(getApplicationAttemptId(1),
|
||||
containerIndex.incrementAndGet());
|
||||
Container container = Records.newRecord(Container.class);
|
||||
container.setId(containerId);
|
||||
container.setPriority(rr.getPriority());
|
||||
|
||||
// We don't use the node for running containers in the test cases. So
|
||||
// it is OK to hard code it to some dummy value
|
||||
NodeId nodeId =
|
||||
NodeId.newInstance(
|
||||
!Strings.isEmpty(rr.getResourceName()) ? rr
|
||||
.getResourceName() : "dummy", 1000);
|
||||
container.setNodeId(nodeId);
|
||||
container.setResource(rr.getCapability());
|
||||
containerList.add(container);
|
||||
|
||||
synchronized (applicationContainerIdMap) {
|
||||
// Keep track of the containers returned to this application. We
|
||||
// will need it in future
|
||||
Assert.assertTrue(
|
||||
"The application id is Not registered before allocate(): "
|
||||
+ amrmToken,
|
||||
applicationContainerIdMap.containsKey(amrmToken));
|
||||
List<ContainerId> ids =
|
||||
applicationContainerIdMap.get(amrmToken);
|
||||
ids.add(containerId);
|
||||
this.allocatedContainerMap.put(containerId, container);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (request.getReleaseList() != null
|
||||
&& request.getReleaseList().size() > 0) {
|
||||
Log.info("Releasing containers: " + request.getReleaseList().size());
|
||||
synchronized (applicationContainerIdMap) {
|
||||
Assert.assertTrue(
|
||||
"The application id is not registered before allocate(): "
|
||||
+ amrmToken,
|
||||
applicationContainerIdMap.containsKey(amrmToken));
|
||||
List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
|
||||
|
||||
for (ContainerId id : request.getReleaseList()) {
|
||||
boolean found = false;
|
||||
for (ContainerId c : ids) {
|
||||
if (c.equals(id)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue(
|
||||
"ContainerId " + id
|
||||
+ " being released is not valid for application: "
|
||||
+ conf.get("AMRMTOKEN"), found);
|
||||
|
||||
ids.remove(id);
|
||||
|
||||
// Return the released container back to the AM with new fake Ids. The
|
||||
// test case does not care about the IDs. The IDs are faked because
|
||||
// otherwise the LRM will throw duplication identifier exception. This
|
||||
// returning of fake containers is ONLY done for testing purpose - for
|
||||
// the test code to get confirmation that the sub-cluster resource
|
||||
// managers received the release request
|
||||
ContainerId fakeContainerId =
|
||||
ContainerId.newInstance(getApplicationAttemptId(1),
|
||||
containerIndex.incrementAndGet());
|
||||
Container fakeContainer = allocatedContainerMap.get(id);
|
||||
fakeContainer.setId(fakeContainerId);
|
||||
containerList.add(fakeContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Log.info("Allocating containers: " + containerList.size()
|
||||
+ " for application attempt: " + conf.get("AMRMTOKEN"));
|
||||
return AllocateResponse.newInstance(0,
|
||||
new ArrayList<ContainerStatus>(), containerList,
|
||||
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
|
||||
new ArrayList<NMToken>(),
|
||||
new ArrayList<ContainerResourceIncrease>(),
|
||||
new ArrayList<ContainerResourceDecrease>());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request) throws YarnException,
|
||||
IOException {
|
||||
|
||||
GetApplicationReportResponse response =
|
||||
Records.newRecord(GetApplicationReportResponse.class);
|
||||
ApplicationReport report = Records.newRecord(ApplicationReport.class);
|
||||
report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
|
||||
report.setApplicationId(request.getApplicationId());
|
||||
report.setCurrentApplicationAttemptId(ApplicationAttemptId
|
||||
.newInstance(request.getApplicationId(), 1));
|
||||
response.setApplicationReport(report);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||
IOException {
|
||||
GetApplicationAttemptReportResponse response =
|
||||
Records.newRecord(GetApplicationAttemptReportResponse.class);
|
||||
ApplicationAttemptReport report =
|
||||
Records.newRecord(ApplicationAttemptReport.class);
|
||||
report.setApplicationAttemptId(request.getApplicationAttemptId());
|
||||
report
|
||||
.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
|
||||
response.setApplicationAttemptReport(report);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request) throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request) throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterMetricsResponse getClusterMetrics(
|
||||
GetClusterMetricsRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationsResponse getApplications(
|
||||
GetApplicationsRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodesResponse getClusterNodes(
|
||||
GetClusterNodesRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
||||
GetQueueUserAclsInfoRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetDelegationTokenResponse getDelegationToken(
|
||||
GetDelegationTokenRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RenewDelegationTokenResponse renewDelegationToken(
|
||||
RenewDelegationTokenRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||
CancelDelegationTokenRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationSubmissionResponse submitReservation(
|
||||
ReservationSubmissionRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationUpdateResponse updateReservation(
|
||||
ReservationUpdateRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationDeleteResponse deleteReservation(
|
||||
ReservationDeleteRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNodesToLabelsResponse getNodeToLabels(
|
||||
GetNodesToLabelsRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetLabelsToNodesResponse getLabelsToNodes(
|
||||
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||
UpdateApplicationPriorityRequest request) throws YarnException,
|
||||
IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* Mock intercepter that does not do anything other than forwarding it to the
|
||||
* next intercepter in the chain
|
||||
*
|
||||
*/
|
||||
public class PassThroughRequestInterceptor extends
|
||||
AbstractRequestInterceptor {
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return getNextInterceptor().registerApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return getNextInterceptor().finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
return getNextInterceptor().allocate(request);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,484 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestAMRMProxyService extends BaseAMRMProxyTest {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestAMRMProxyService.class);
|
||||
|
||||
/**
|
||||
* Test if the pipeline is created properly.
|
||||
*/
|
||||
@Test
|
||||
public void testRequestInterceptorChainCreation() throws Exception {
|
||||
RequestInterceptor root =
|
||||
super.getAMRMProxyService().createRequestInterceptorChain();
|
||||
int index = 0;
|
||||
while (root != null) {
|
||||
switch (index) {
|
||||
case 0:
|
||||
case 1:
|
||||
case 2:
|
||||
Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
|
||||
root.getClass().getName());
|
||||
break;
|
||||
case 3:
|
||||
Assert.assertEquals(MockRequestInterceptor.class.getName(), root
|
||||
.getClass().getName());
|
||||
break;
|
||||
}
|
||||
|
||||
root = root.getNextInterceptor();
|
||||
index++;
|
||||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
"The number of interceptors in chain does not match",
|
||||
Integer.toString(4), Integer.toString(index));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests registration of a single application master.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRegisterOneApplicationMaster() throws Exception {
|
||||
// The testAppId identifier is used as host name and the mock resource
|
||||
// manager return it as the queue name. Assert that we received the queue
|
||||
// name
|
||||
int testAppId = 1;
|
||||
RegisterApplicationMasterResponse response1 =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(response1);
|
||||
Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the registration of multiple application master serially one at a
|
||||
* time.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRegisterMulitpleApplicationMasters() throws Exception {
|
||||
for (int testAppId = 0; testAppId < 3; testAppId++) {
|
||||
RegisterApplicationMasterResponse response =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(response);
|
||||
Assert
|
||||
.assertEquals(Integer.toString(testAppId), response.getQueue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the registration of multiple application masters using multiple
|
||||
* threads in parallel.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRegisterMulitpleApplicationMastersInParallel()
|
||||
throws Exception {
|
||||
int numberOfRequests = 5;
|
||||
ArrayList<String> testContexts =
|
||||
CreateTestRequestIdentifiers(numberOfRequests);
|
||||
super.registerApplicationMastersInParallel(testContexts);
|
||||
}
|
||||
|
||||
private ArrayList<String> CreateTestRequestIdentifiers(
|
||||
int numberOfRequests) {
|
||||
ArrayList<String> testContexts = new ArrayList<String>();
|
||||
LOG.info("Creating " + numberOfRequests + " contexts for testing");
|
||||
for (int ep = 0; ep < numberOfRequests; ep++) {
|
||||
testContexts.add("test-endpoint-" + Integer.toString(ep));
|
||||
LOG.info("Created test context: " + testContexts.get(ep));
|
||||
}
|
||||
return testContexts;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishOneApplicationMasterWithSuccess() throws Exception {
|
||||
int testAppId = 1;
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
Assert.assertEquals(Integer.toString(testAppId),
|
||||
registerResponse.getQueue());
|
||||
|
||||
FinishApplicationMasterResponse finshResponse =
|
||||
finishApplicationMaster(testAppId,
|
||||
FinalApplicationStatus.SUCCEEDED);
|
||||
|
||||
Assert.assertNotNull(finshResponse);
|
||||
Assert.assertEquals(true, finshResponse.getIsUnregistered());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishOneApplicationMasterWithFailure() throws Exception {
|
||||
int testAppId = 1;
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
Assert.assertEquals(Integer.toString(testAppId),
|
||||
registerResponse.getQueue());
|
||||
|
||||
FinishApplicationMasterResponse finshResponse =
|
||||
finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
|
||||
|
||||
Assert.assertNotNull(finshResponse);
|
||||
Assert.assertEquals(false, finshResponse.getIsUnregistered());
|
||||
|
||||
try {
|
||||
// Try to finish an application master that is already finished.
|
||||
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
|
||||
Assert
|
||||
.fail("The request to finish application master should have failed");
|
||||
} catch (Throwable ex) {
|
||||
// This is expected. So nothing required here.
|
||||
LOG.info("Finish registration failed as expected because it was not registered");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishInvalidApplicationMaster() throws Exception {
|
||||
try {
|
||||
// Try to finish an application master that was not registered.
|
||||
finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
|
||||
Assert
|
||||
.fail("The request to finish application master should have failed");
|
||||
} catch (Throwable ex) {
|
||||
// This is expected. So nothing required here.
|
||||
LOG.info("Finish registration failed as expected because it was not registered");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishMulitpleApplicationMasters() throws Exception {
|
||||
int numberOfRequests = 3;
|
||||
for (int index = 0; index < numberOfRequests; index++) {
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(index);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
Assert.assertEquals(Integer.toString(index),
|
||||
registerResponse.getQueue());
|
||||
}
|
||||
|
||||
// Finish in reverse sequence
|
||||
for (int index = numberOfRequests - 1; index >= 0; index--) {
|
||||
FinishApplicationMasterResponse finshResponse =
|
||||
finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
|
||||
|
||||
Assert.assertNotNull(finshResponse);
|
||||
Assert.assertEquals(true, finshResponse.getIsUnregistered());
|
||||
|
||||
// Assert that the application has been removed from the collection
|
||||
Assert.assertTrue(this.getAMRMProxyService()
|
||||
.getPipelines().size() == index);
|
||||
}
|
||||
|
||||
try {
|
||||
// Try to finish an application master that is already finished.
|
||||
finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED);
|
||||
Assert
|
||||
.fail("The request to finish application master should have failed");
|
||||
} catch (Throwable ex) {
|
||||
// This is expected. So nothing required here.
|
||||
LOG.info("Finish registration failed as expected because it was not registered");
|
||||
}
|
||||
|
||||
try {
|
||||
// Try to finish an application master that was not registered.
|
||||
finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
|
||||
Assert
|
||||
.fail("The request to finish application master should have failed");
|
||||
} catch (Throwable ex) {
|
||||
// This is expected. So nothing required here.
|
||||
LOG.info("Finish registration failed as expected because it was not registered");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishMulitpleApplicationMastersInParallel()
|
||||
throws Exception {
|
||||
int numberOfRequests = 5;
|
||||
ArrayList<String> testContexts = new ArrayList<String>();
|
||||
LOG.info("Creating " + numberOfRequests + " contexts for testing");
|
||||
for (int i = 0; i < numberOfRequests; i++) {
|
||||
testContexts.add("test-endpoint-" + Integer.toString(i));
|
||||
LOG.info("Created test context: " + testContexts.get(i));
|
||||
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(i);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
Assert
|
||||
.assertEquals(Integer.toString(i), registerResponse.getQueue());
|
||||
}
|
||||
|
||||
finishApplicationMastersInParallel(testContexts);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateRequestWithNullValues() throws Exception {
|
||||
int testAppId = 1;
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
Assert.assertEquals(Integer.toString(testAppId),
|
||||
registerResponse.getQueue());
|
||||
|
||||
AllocateResponse allocateResponse = allocate(testAppId);
|
||||
Assert.assertNotNull(allocateResponse);
|
||||
|
||||
FinishApplicationMasterResponse finshResponse =
|
||||
finishApplicationMaster(testAppId,
|
||||
FinalApplicationStatus.SUCCEEDED);
|
||||
|
||||
Assert.assertNotNull(finshResponse);
|
||||
Assert.assertEquals(true, finshResponse.getIsUnregistered());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateRequestWithoutRegistering() throws Exception {
|
||||
|
||||
try {
|
||||
// Try to allocate an application master without registering.
|
||||
allocate(1);
|
||||
Assert
|
||||
.fail("The request to allocate application master should have failed");
|
||||
} catch (Throwable ex) {
|
||||
// This is expected. So nothing required here.
|
||||
LOG.info("AllocateRequest failed as expected because AM was not registered");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateWithOneResourceRequest() throws Exception {
|
||||
int testAppId = 1;
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
getContainersAndAssert(testAppId, 1);
|
||||
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateWithMultipleResourceRequest() throws Exception {
|
||||
int testAppId = 1;
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
getContainersAndAssert(testAppId, 10);
|
||||
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateAndReleaseContainers() throws Exception {
|
||||
int testAppId = 1;
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
List<Container> containers = getContainersAndAssert(testAppId, 10);
|
||||
releaseContainersAndAssert(testAppId, containers);
|
||||
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateAndReleaseContainersForMultipleAM()
|
||||
throws Exception {
|
||||
int numberOfApps = 5;
|
||||
for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
List<Container> containers = getContainersAndAssert(testAppId, 10);
|
||||
releaseContainersAndAssert(testAppId, containers);
|
||||
}
|
||||
for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
|
||||
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateAndReleaseContainersForMultipleAMInParallel()
|
||||
throws Exception {
|
||||
int numberOfApps = 6;
|
||||
ArrayList<Integer> tempAppIds = new ArrayList<Integer>();
|
||||
for (int i = 0; i < numberOfApps; i++) {
|
||||
tempAppIds.add(new Integer(i));
|
||||
}
|
||||
|
||||
final ArrayList<Integer> appIds = tempAppIds;
|
||||
List<Integer> responses =
|
||||
runInParallel(appIds, new Function<Integer, Integer>() {
|
||||
@Override
|
||||
public Integer invoke(Integer testAppId) {
|
||||
try {
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.assertNotNull("response is null", registerResponse);
|
||||
List<Container> containers =
|
||||
getContainersAndAssert(testAppId, 10);
|
||||
releaseContainersAndAssert(testAppId, containers);
|
||||
|
||||
LOG.info("Sucessfully registered application master with appId: "
|
||||
+ testAppId);
|
||||
} catch (Throwable ex) {
|
||||
LOG.error(
|
||||
"Failed to register application master with appId: "
|
||||
+ testAppId, ex);
|
||||
testAppId = null;
|
||||
}
|
||||
|
||||
return testAppId;
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertEquals(
|
||||
"Number of responses received does not match with request",
|
||||
appIds.size(), responses.size());
|
||||
|
||||
for (Integer testAppId : responses) {
|
||||
Assert.assertNotNull(testAppId);
|
||||
finishApplicationMaster(testAppId.intValue(),
|
||||
FinalApplicationStatus.SUCCEEDED);
|
||||
}
|
||||
}
|
||||
|
||||
private List<Container> getContainersAndAssert(int appId,
|
||||
int numberOfResourceRequests) throws Exception {
|
||||
AllocateRequest allocateRequest =
|
||||
Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setResponseId(1);
|
||||
|
||||
List<Container> containers =
|
||||
new ArrayList<Container>(numberOfResourceRequests);
|
||||
List<ResourceRequest> askList =
|
||||
new ArrayList<ResourceRequest>(numberOfResourceRequests);
|
||||
for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
|
||||
askList.add(createResourceRequest(
|
||||
"test-node-" + Integer.toString(testAppId), 6000, 2,
|
||||
testAppId % 5, 1));
|
||||
}
|
||||
|
||||
allocateRequest.setAskList(askList);
|
||||
|
||||
AllocateResponse allocateResponse = allocate(appId, allocateRequest);
|
||||
Assert.assertNotNull("allocate() returned null response",
|
||||
allocateResponse);
|
||||
|
||||
containers.addAll(allocateResponse.getAllocatedContainers());
|
||||
|
||||
// Send max 10 heart beats to receive all the containers. If not, we will
|
||||
// fail the test
|
||||
int numHeartbeat = 0;
|
||||
while (containers.size() < askList.size() && numHeartbeat++ < 10) {
|
||||
allocateResponse =
|
||||
allocate(appId, Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNotNull("allocate() returned null response",
|
||||
allocateResponse);
|
||||
|
||||
containers.addAll(allocateResponse.getAllocatedContainers());
|
||||
|
||||
LOG.info("Number of allocated containers in this request: "
|
||||
+ Integer.toString(allocateResponse.getAllocatedContainers()
|
||||
.size()));
|
||||
LOG.info("Total number of allocated containers: "
|
||||
+ Integer.toString(containers.size()));
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
// We broadcast the request, the number of containers we received will be
|
||||
// higher than we ask
|
||||
Assert.assertTrue("The asklist count is not same as response",
|
||||
askList.size() <= containers.size());
|
||||
return containers;
|
||||
}
|
||||
|
||||
private void releaseContainersAndAssert(int appId,
|
||||
List<Container> containers) throws Exception {
|
||||
Assert.assertTrue(containers.size() > 0);
|
||||
AllocateRequest allocateRequest =
|
||||
Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setResponseId(1);
|
||||
|
||||
List<ContainerId> relList =
|
||||
new ArrayList<ContainerId>(containers.size());
|
||||
for (Container container : containers) {
|
||||
relList.add(container.getId());
|
||||
}
|
||||
|
||||
allocateRequest.setReleaseList(relList);
|
||||
|
||||
AllocateResponse allocateResponse = allocate(appId, allocateRequest);
|
||||
Assert.assertNotNull(allocateResponse);
|
||||
|
||||
// The way the mock resource manager is setup, it will return the containers
|
||||
// that were released in the response. This is done because the UAMs run
|
||||
// asynchronously and we need to if all the resource managers received the
|
||||
// release it. The containers sent by the mock resource managers will be
|
||||
// aggregated and returned back to us and we can assert if all the release
|
||||
// lists reached the sub-clusters
|
||||
List<Container> containersForReleasedContainerIds =
|
||||
new ArrayList<Container>();
|
||||
containersForReleasedContainerIds.addAll(allocateResponse
|
||||
.getAllocatedContainers());
|
||||
|
||||
// Send max 10 heart beats to receive all the containers. If not, we will
|
||||
// fail the test
|
||||
int numHeartbeat = 0;
|
||||
while (containersForReleasedContainerIds.size() < relList.size()
|
||||
&& numHeartbeat++ < 10) {
|
||||
allocateResponse =
|
||||
allocate(appId, Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNotNull(allocateResponse);
|
||||
containersForReleasedContainerIds.addAll(allocateResponse
|
||||
.getAllocatedContainers());
|
||||
|
||||
LOG.info("Number of containers received in this request: "
|
||||
+ Integer.toString(allocateResponse.getAllocatedContainers()
|
||||
.size()));
|
||||
LOG.info("Total number of containers received: "
|
||||
+ Integer.toString(containersForReleasedContainerIds.size()));
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
Assert.assertEquals(relList.size(),
|
||||
containersForReleasedContainerIds.size());
|
||||
}
|
||||
}
|
|
@ -40,9 +40,7 @@ import org.apache.hadoop.security.SaslRpcServer;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
|
@ -76,7 +74,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
|
@ -95,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -175,69 +173,13 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
return this.masterServiceAddress;
|
||||
}
|
||||
|
||||
// Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
|
||||
// currently sets only the required id, but iterate through anyways just to be
|
||||
// sure.
|
||||
private AMRMTokenIdentifier selectAMRMTokenIdentifier(
|
||||
UserGroupInformation remoteUgi) throws IOException {
|
||||
AMRMTokenIdentifier result = null;
|
||||
Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
|
||||
for (TokenIdentifier tokenId : tokenIds) {
|
||||
if (tokenId instanceof AMRMTokenIdentifier) {
|
||||
result = (AMRMTokenIdentifier) tokenId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private AMRMTokenIdentifier authorizeRequest()
|
||||
throws YarnException {
|
||||
|
||||
UserGroupInformation remoteUgi;
|
||||
try {
|
||||
remoteUgi = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
String msg =
|
||||
"Cannot obtain the user-name for authorizing ApplicationMaster. "
|
||||
+ "Got exception: " + StringUtils.stringifyException(e);
|
||||
LOG.warn(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
|
||||
boolean tokenFound = false;
|
||||
String message = "";
|
||||
AMRMTokenIdentifier appTokenIdentifier = null;
|
||||
try {
|
||||
appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
|
||||
if (appTokenIdentifier == null) {
|
||||
tokenFound = false;
|
||||
message = "No AMRMToken found for user " + remoteUgi.getUserName();
|
||||
} else {
|
||||
tokenFound = true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
tokenFound = false;
|
||||
message =
|
||||
"Got exception while looking for AMRMToken for user "
|
||||
+ remoteUgi.getUserName();
|
||||
}
|
||||
|
||||
if (!tokenFound) {
|
||||
LOG.warn(message);
|
||||
throw RPCUtil.getRemoteException(message);
|
||||
}
|
||||
|
||||
return appTokenIdentifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
|
||||
AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
|
||||
AMRMTokenIdentifier amrmTokenIdentifier =
|
||||
YarnServerSecurityUtils.authorizeRequest();
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
amrmTokenIdentifier.getApplicationAttemptId();
|
||||
|
||||
|
@ -346,7 +288,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
IOException {
|
||||
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
authorizeRequest().getApplicationAttemptId();
|
||||
YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();
|
||||
ApplicationId appId = applicationAttemptId.getApplicationId();
|
||||
|
||||
RMApp rmApp =
|
||||
|
@ -430,7 +372,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
|
||||
AMRMTokenIdentifier amrmTokenIdentifier =
|
||||
YarnServerSecurityUtils.authorizeRequest();
|
||||
|
||||
ApplicationAttemptId appAttemptId =
|
||||
amrmTokenIdentifier.getApplicationAttemptId();
|
||||
|
|
Loading…
Reference in New Issue