From f23be93dd1cc78e9f831d9059647c83059f1d847 Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 8 Sep 2015 09:35:46 +0800 Subject: [PATCH] YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil (cherry picked from commit 6f72f1e6003ab11679bebeb96f27f1f62b3b3e02) --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 17 + .../conf/TestYarnConfigurationFields.java | 2 + .../src/main/resources/yarn-default.xml | 34 + .../server/utils/YarnServerSecurityUtils.java | 142 ++++ .../AMRMProxyApplicationContext.java | 70 ++ .../AMRMProxyApplicationContextImpl.java | 132 ++++ .../amrmproxy/AMRMProxyService.java | 592 +++++++++++++++ .../AMRMProxyTokenSecretManager.java | 265 +++++++ .../amrmproxy/AbstractRequestInterceptor.java | 102 +++ .../amrmproxy/DefaultRequestInterceptor.java | 138 ++++ .../amrmproxy/RequestInterceptor.java | 71 ++ .../ContainerManagerImpl.java | 67 +- .../amrmproxy/BaseAMRMProxyTest.java | 677 ++++++++++++++++++ .../amrmproxy/MockRequestInterceptor.java | 65 ++ .../amrmproxy/MockResourceManagerFacade.java | 469 ++++++++++++ .../PassThroughRequestInterceptor.java | 58 ++ .../amrmproxy/TestAMRMProxyService.java | 484 +++++++++++++ .../ApplicationMasterService.java | 69 +- 19 files changed, 3366 insertions(+), 91 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7aede89b72d..24e4bd4877a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -132,6 +132,9 @@ Release 2.8.0 - UNRELEASED YARN-3970. Add REST api support for Application Priority. (Naganarasimha G R via vvasudev) + YARN-2884. Added a proxy service in NM to proxy the the communication + between AM and RM. (Kishore Chaliparambil via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9d03470d51e..182be8e6a71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1332,6 +1332,23 @@ private static void addDeprecatedKeys() { public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX + "application.classpath"; + public static final String AMRM_PROXY_ENABLED = NM_PREFIX + + "amrmproxy.enable"; + public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false; + public static final String AMRM_PROXY_ADDRESS = NM_PREFIX + + "amrmproxy.address"; + public static final int DEFAULT_AMRM_PROXY_PORT = 8048; + public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:" + + DEFAULT_AMRM_PROXY_PORT; + public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX + + "amrmproxy.client.thread-count"; + public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25; + public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = + NM_PREFIX + "amrmproxy.interceptor-class.pipeline"; + public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = + "org.apache.hadoop.yarn.server.nodemanager.amrmproxy." + + "DefaultRequestInterceptor"; + /** * Default platform-agnostic CLASSPATH for YARN applications. A * comma-separated list of CLASSPATH entries. The parameter expansion marker diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index e89a90dfb0b..97fcfa1c66c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -86,6 +86,8 @@ public void initializeMemberVariables() { .add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS); configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); // Ignore all YARN Application Timeline Service (version 1) properties configurationPrefixToSkipCompare.add("yarn.timeline-service."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 59bfb569865..b76defb712d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2259,4 +2259,38 @@ + + + Enable/Disable AMRMProxyService in the node manager. This service is used to intercept + calls from the application masters to the resource manager. + + yarn.nodemanager.amrmproxy.enable + false + + + + + The address of the AMRMProxyService listener. + + yarn.nodemanager.amrmproxy.address + 0.0.0.0:8048 + + + + + The number of threads used to handle requests by the AMRMProxyService. + + yarn.nodemanager.amrmproxy.client.thread-count + 25 + + + + + The comma separated list of class names that implement the RequestInterceptor interface. This is used by the + AMRMProxyService to create the request processing pipeline for applications. + + yarn.nodemanager.amrmproxy.interceptor-class.pipeline + org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java new file mode 100644 index 00000000000..9af556e7c8b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.utils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that contains commonly used server methods. + * + */ +@Private +public final class YarnServerSecurityUtils { + private static final Logger LOG = LoggerFactory + .getLogger(YarnServerSecurityUtils.class); + + private YarnServerSecurityUtils() { + } + + /** + * Authorizes the current request and returns the AMRMTokenIdentifier for the + * current application. + * + * @return the AMRMTokenIdentifier instance for the current user + * @throws YarnException + */ + public static AMRMTokenIdentifier authorizeRequest() + throws YarnException { + + UserGroupInformation remoteUgi; + try { + remoteUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + String msg = + "Cannot obtain the user-name for authorizing ApplicationMaster. " + + "Got exception: " + StringUtils.stringifyException(e); + LOG.warn(msg); + throw RPCUtil.getRemoteException(msg); + } + + boolean tokenFound = false; + String message = ""; + AMRMTokenIdentifier appTokenIdentifier = null; + try { + appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi); + if (appTokenIdentifier == null) { + tokenFound = false; + message = "No AMRMToken found for user " + remoteUgi.getUserName(); + } else { + tokenFound = true; + } + } catch (IOException e) { + tokenFound = false; + message = + "Got exception while looking for AMRMToken for user " + + remoteUgi.getUserName(); + } + + if (!tokenFound) { + LOG.warn(message); + throw RPCUtil.getRemoteException(message); + } + + return appTokenIdentifier; + } + + // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer + // currently sets only the required id, but iterate through anyways just to be + // sure. + private static AMRMTokenIdentifier selectAMRMTokenIdentifier( + UserGroupInformation remoteUgi) throws IOException { + AMRMTokenIdentifier result = null; + Set tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + + return result; + } + + /** + * Parses the container launch context and returns a Credential instance that + * contains all the tokens from the launch context. + * @param launchContext + * @return the credential instance + * @throws IOException + */ + public static Credentials parseCredentials( + ContainerLaunchContext launchContext) throws IOException { + Credentials credentials = new Credentials(); + ByteBuffer tokens = launchContext.getTokens(); + + if (tokens != null) { + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokens.rewind(); + buf.reset(tokens); + credentials.readTokenStorageStream(buf); + if (LOG.isDebugEnabled()) { + for (Token tk : credentials + .getAllTokens()) { + LOG.debug(tk.getService() + " = " + tk.toString()); + } + } + } + + return credentials; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java new file mode 100644 index 00000000000..c355a8b4f42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; + +/** + * Interface that can be used by the intercepter plugins to get the information + * about one application. + * + */ +public interface AMRMProxyApplicationContext { + + /** + * Gets the configuration object instance. + * @return the configuration object. + */ + Configuration getConf(); + + /** + * Gets the application attempt identifier. + * @return the application attempt identifier. + */ + ApplicationAttemptId getApplicationAttemptId(); + + /** + * Gets the application submitter. + * @return the application submitter + */ + String getUser(); + + /** + * Gets the application's AMRMToken that is issued by the RM. + * @return the application's AMRMToken that is issued by the RM. + */ + Token getAMRMToken(); + + /** + * Gets the application's local AMRMToken issued by the proxy service. + * @return the application's local AMRMToken issued by the proxy service. + */ + Token getLocalAMRMToken(); + + /** + * Gets the NMContext object. + * @return the NMContext. + */ + Context getNMCotext(); + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java new file mode 100644 index 00000000000..2e5aa94128b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; + +/** + * Encapsulates the information about one application that is needed by the + * request intercepters. + * + */ +public class AMRMProxyApplicationContextImpl implements + AMRMProxyApplicationContext { + private final Configuration conf; + private final Context nmContext; + private final ApplicationAttemptId applicationAttemptId; + private final String user; + private Integer localTokenKeyId; + private Token amrmToken; + private Token localToken; + + /** + * Create an instance of the AMRMProxyApplicationContext. + * + * @param nmContext + * @param conf + * @param applicationAttemptId + * @param user + * @param amrmToken + */ + public AMRMProxyApplicationContextImpl(Context nmContext, + Configuration conf, ApplicationAttemptId applicationAttemptId, + String user, Token amrmToken, + Token localToken) { + this.nmContext = nmContext; + this.conf = conf; + this.applicationAttemptId = applicationAttemptId; + this.user = user; + this.amrmToken = amrmToken; + this.localToken = localToken; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + + @Override + public String getUser() { + return user; + } + + @Override + public synchronized Token getAMRMToken() { + return amrmToken; + } + + /** + * Sets the application's AMRMToken. + */ + public synchronized void setAMRMToken( + Token amrmToken) { + this.amrmToken = amrmToken; + } + + @Override + public synchronized Token getLocalAMRMToken() { + return this.localToken; + } + + /** + * Sets the application's AMRMToken. + */ + public synchronized void setLocalAMRMToken( + Token localToken) { + this.localToken = localToken; + this.localTokenKeyId = null; + } + + @Private + public synchronized int getLocalAMRMTokenKeyId() { + Integer keyId = this.localTokenKeyId; + if (keyId == null) { + try { + if (this.localToken == null) { + throw new YarnRuntimeException("Missing AMRM token for " + + this.applicationAttemptId); + } + keyId = this.amrmToken.decodeIdentifier().getKeyId(); + this.localTokenKeyId = keyId; + } catch (IOException e) { + throw new YarnRuntimeException("AMRM token decode error for " + + this.applicationAttemptId, e); + } + } + return keyId; + } + + @Override + public Context getNMCotext() { + return nmContext; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java new file mode 100644 index 00000000000..bd6538c99f9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -0,0 +1,592 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.security.MasterKeyData; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * AMRMProxyService is a service that runs on each node manager that can be used + * to intercept and inspect messages from application master to the cluster + * resource manager. It listens to messages from the application master and + * creates a request intercepting pipeline instance for each application. The + * pipeline is a chain of intercepter instances that can inspect and modify the + * request/response as needed. + */ +public class AMRMProxyService extends AbstractService implements + ApplicationMasterProtocol { + private static final Logger LOG = LoggerFactory + .getLogger(AMRMProxyService.class); + private Server server; + private final Context nmContext; + private final AsyncDispatcher dispatcher; + private InetSocketAddress listenerEndpoint; + private AMRMProxyTokenSecretManager secretManager; + private Map applPipelineMap; + + /** + * Creates an instance of the service. + * + * @param nmContext + * @param dispatcher + */ + public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { + super(AMRMProxyService.class.getName()); + Preconditions.checkArgument(nmContext != null, "nmContext is null"); + Preconditions.checkArgument(dispatcher != null, "dispatcher is null"); + this.nmContext = nmContext; + this.dispatcher = dispatcher; + this.applPipelineMap = + new ConcurrentHashMap(); + + this.dispatcher.register(ApplicationEventType.class, + new ApplicationEventHandler()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting AMRMProxyService"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_PORT); + + Configuration serverConf = new Configuration(conf); + serverConf.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + + int numWorkerThreads = + serverConf.getInt( + YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT); + + this.secretManager = new AMRMProxyTokenSecretManager(serverConf); + this.secretManager.start(); + + this.server = + rpc.getServer(ApplicationMasterProtocol.class, this, + listenerEndpoint, serverConf, this.secretManager, + numWorkerThreads); + + this.server.start(); + LOG.info("AMRMProxyService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping AMRMProxyService"); + if (this.server != null) { + this.server.stop(); + } + + this.secretManager.stop(); + + super.serviceStop(); + } + + /** + * This is called by the AMs started on this node to register with the RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific intercepter chain. + */ + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Registering application master." + " Host:" + + request.getHost() + " Port:" + request.getRpcPort() + + " Tracking Url:" + request.getTrackingUrl()); + RequestInterceptorChainWrapper pipeline = + authorizeAndGetInterceptorChain(); + return pipeline.getRootInterceptor() + .registerApplicationMaster(request); + } + + /** + * This is called by the AMs started on this node to unregister from the RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific intercepter chain. + */ + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Finishing application master. Tracking Url:" + + request.getTrackingUrl()); + RequestInterceptorChainWrapper pipeline = + authorizeAndGetInterceptorChain(); + return pipeline.getRootInterceptor().finishApplicationMaster(request); + } + + /** + * This is called by the AMs started on this node to send heart beat to RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific pipeline, which is a chain of request + * intercepter objects. One application request processing pipeline is created + * per AM instance. + */ + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + AMRMTokenIdentifier amrmTokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); + RequestInterceptorChainWrapper pipeline = + getInterceptorChain(amrmTokenIdentifier); + AllocateResponse allocateResponse = + pipeline.getRootInterceptor().allocate(request); + + updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse); + + return allocateResponse; + } + + /** + * Callback from the ContainerManager implementation for initializing the + * application request processing pipeline. + * + * @param request - encapsulates information for starting an AM + * @throws IOException + * @throws YarnException + */ + public void processApplicationStartRequest(StartContainerRequest request) + throws IOException, YarnException { + LOG.info("Callback received for initializing request " + + "processing pipeline for an AM"); + ContainerTokenIdentifier containerTokenIdentifierForKey = + BuilderUtils.newContainerTokenIdentifier(request + .getContainerToken()); + ApplicationAttemptId appAttemptId = + containerTokenIdentifierForKey.getContainerID() + .getApplicationAttemptId(); + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(request + .getContainerLaunchContext()); + + Token amrmToken = + getFirstAMRMToken(credentials.getAllTokens()); + if (amrmToken == null) { + throw new YarnRuntimeException( + "AMRMToken not found in the start container request for application:" + + appAttemptId.toString()); + } + + // Substitute the existing AMRM Token with a local one. Keep the rest of the + // tokens in the credentials intact. + Token localToken = + this.secretManager.createAndGetAMRMToken(appAttemptId); + credentials.addToken(localToken.getService(), localToken); + + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + request.getContainerLaunchContext().setTokens( + ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + + initializePipeline(containerTokenIdentifierForKey.getContainerID() + .getApplicationAttemptId(), + containerTokenIdentifierForKey.getApplicationSubmitter(), + amrmToken, localToken); + } + + /** + * Initializes the request intercepter pipeline for the specified application. + * + * @param applicationAttemptId + * @param user + * @param amrmToken + */ + protected void initializePipeline( + ApplicationAttemptId applicationAttemptId, String user, + Token amrmToken, + Token localToken) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (applPipelineMap) { + if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) { + LOG.warn("Request to start an already existing appId was received. " + + " This can happen if an application failed and a new attempt " + + "was created on this machine. ApplicationId: " + + applicationAttemptId.toString()); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.applPipelineMap.put(applicationAttemptId.getApplicationId(), + chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info("Initializing request processing pipeline for application. " + + " ApplicationId:" + applicationAttemptId + " for the user: " + + user); + + RequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(createApplicationMasterContext( + applicationAttemptId, user, amrmToken, localToken)); + chainWrapper.init(interceptorChain, applicationAttemptId); + } + + /** + * Shuts down the request processing pipeline for the specified application + * attempt id. + * + * @param applicationId + */ + protected void stopApplication(ApplicationId applicationId) { + Preconditions.checkArgument(applicationId != null, + "applicationId is null"); + RequestInterceptorChainWrapper pipeline = + this.applPipelineMap.remove(applicationId); + + if (pipeline == null) { + LOG.info("Request to stop an application that does not exist. Id:" + + applicationId); + } else { + LOG.info("Stopping the request processing pipeline for application: " + + applicationId); + try { + pipeline.getRootInterceptor().shutdown(); + } catch (Throwable ex) { + LOG.warn( + "Failed to shutdown the request processing pipeline for app:" + + applicationId, ex); + } + } + } + + private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, + RequestInterceptorChainWrapper pipeline, + AllocateResponse allocateResponse) { + AMRMProxyApplicationContextImpl context = + (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor() + .getApplicationContext(); + + // check to see if the RM has issued a new AMRMToken & accordingly update + // the real ARMRMToken in the current context + if (allocateResponse.getAMRMToken() != null) { + org.apache.hadoop.yarn.api.records.Token token = + allocateResponse.getAMRMToken(); + + org.apache.hadoop.security.token.Token newTokenId = + new org.apache.hadoop.security.token.Token( + token.getIdentifier().array(), token.getPassword().array(), + new Text(token.getKind()), new Text(token.getService())); + + context.setAMRMToken(newTokenId); + } + + // Check if the local AMRMToken is rolled up and update the context and + // response accordingly + MasterKeyData nextMasterKey = + this.secretManager.getNextMasterKeyData(); + + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier + .getKeyId()) { + Token localToken = context.getLocalAMRMToken(); + if (nextMasterKey.getMasterKey().getKeyId() != context + .getLocalAMRMTokenKeyId()) { + LOG.info("The local AMRMToken has been rolled-over." + + " Send new local AMRMToken back to application: " + + pipeline.getApplicationId()); + localToken = + this.secretManager.createAndGetAMRMToken(pipeline + .getApplicationAttemptId()); + context.setLocalAMRMToken(localToken); + } + + allocateResponse + .setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(localToken.getIdentifier(), localToken + .getKind().toString(), localToken.getPassword(), + localToken.getService().toString())); + } + } + + private AMRMProxyApplicationContext createApplicationMasterContext( + ApplicationAttemptId applicationAttemptId, String user, + Token amrmToken, + Token localToken) { + AMRMProxyApplicationContextImpl appContext = + new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(), + applicationAttemptId, user, amrmToken, localToken); + return appContext; + } + + /** + * Gets the Request intercepter chains for all the applications. + * + * @return the request intercepter chains. + */ + protected Map getPipelines() { + return this.applPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + protected RequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List interceptorClassNames = getInterceptorClassNames(conf); + + RequestInterceptor pipeline = null; + RequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class interceptorClass = + conf.getClassByName(interceptorClassName); + if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) { + RequestInterceptor interceptorInstance = + (RequestInterceptor) ReflectionUtils.newInstance( + interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException("Class: " + interceptorClassName + + " not instance of " + + RequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ApplicationMasterRequestInterceptor: " + + interceptorClassName, e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param conf + * @return the intercepter class names as an instance of ArrayList + */ + private List getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = + conf.get( + YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); + + List interceptorClassNames = new ArrayList(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + /** + * Authorizes the request and returns the application specific request + * processing pipeline. + * + * @return the the intercepter wrapper instance + * @throws YarnException + */ + private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain() + throws YarnException { + AMRMTokenIdentifier tokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); + return getInterceptorChain(tokenIdentifier); + } + + private RequestInterceptorChainWrapper getInterceptorChain( + AMRMTokenIdentifier tokenIdentifier) throws YarnException { + ApplicationAttemptId appAttemptId = + tokenIdentifier.getApplicationAttemptId(); + + synchronized (this.applPipelineMap) { + if (!this.applPipelineMap.containsKey(appAttemptId + .getApplicationId())) { + throw new YarnException( + "The AM request processing pipeline is not initialized for app: " + + appAttemptId.getApplicationId().toString()); + } + + return this.applPipelineMap.get(appAttemptId.getApplicationId()); + } + } + + @SuppressWarnings("unchecked") + private Token getFirstAMRMToken( + Collection> allTokens) { + Iterator> iter = allTokens.iterator(); + while (iter.hasNext()) { + Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + return (Token) token; + } + } + + return null; + } + + /** + * Private class for handling application stop events. + * + */ + class ApplicationEventHandler implements EventHandler { + + @Override + public void handle(ApplicationEvent event) { + Application app = + AMRMProxyService.this.nmContext.getApplications().get( + event.getApplicationID()); + if (app != null) { + switch (event.getType()) { + case FINISH_APPLICATION: + LOG.info("Application stop event received for stopping AppId:" + + event.getApplicationID().toString()); + AMRMProxyService.this.stopApplication(event.getApplicationID()); + break; + default: + LOG.debug("AMRMProxy is ignoring event: " + event.getType()); + break; + } + } else { + LOG.warn("Event " + event + " sent to absent application " + + event.getApplicationID()); + } + } + } + + /** + * Private structure for encapsulating RequestInterceptor and + * ApplicationAttemptId instances. + * + */ + private static class RequestInterceptorChainWrapper { + private RequestInterceptor rootInterceptor; + private ApplicationAttemptId applicationAttemptId; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param rootInterceptor + * @param applicationAttemptId + */ + public synchronized void init(RequestInterceptor rootInterceptor, + ApplicationAttemptId applicationAttemptId) { + this.rootInterceptor = rootInterceptor; + this.applicationAttemptId = applicationAttemptId; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized RequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Gets the application attempt identifier. + * + * @return the application attempt identifier + */ + public synchronized ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + + /** + * Gets the application identifier. + * + * @return the application identifier + */ + public synchronized ApplicationId getApplicationId() { + return applicationAttemptId.getApplicationId(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java new file mode 100644 index 00000000000..d09ce41d082 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.security.SecureRandom; +import java.util.HashSet; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.security.MasterKeyData; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This secret manager instance is used by the AMRMProxyService to generate and + * manage tokens. + */ +public class AMRMProxyTokenSecretManager extends + SecretManager { + + private static final Log LOG = LogFactory + .getLog(AMRMProxyTokenSecretManager.class); + + private int serialNo = new SecureRandom().nextInt(); + private MasterKeyData nextMasterKey; + private MasterKeyData currentMasterKey; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + private final Timer timer; + private final long rollingInterval; + private final long activationDelay; + + private final Set appAttemptSet = + new HashSet(); + + /** + * Create an {@link AMRMProxyTokenSecretManager}. + */ + public AMRMProxyTokenSecretManager(Configuration conf) { + this.timer = new Timer(); + this.rollingInterval = + conf.getLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + // Adding delay = 1.5 * expiry interval makes sure that all active AMs get + // the updated shared-key. + this.activationDelay = + (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5); + LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval + + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + + " ms"); + if (rollingInterval <= activationDelay * 2) { + throw new IllegalArgumentException( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS + + " should be more than 3 X " + + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS); + } + } + + public void start() { + if (this.currentMasterKey == null) { + this.currentMasterKey = createNewMasterKey(); + } + this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, + rollingInterval); + } + + public void stop() { + this.timer.cancel(); + } + + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Application finished, removing password for " + + appAttemptId); + this.appAttemptSet.remove(appAttemptId); + } finally { + this.writeLock.unlock(); + } + } + + private class MasterKeyRoller extends TimerTask { + @Override + public void run() { + rollMasterKey(); + } + } + + @Private + void rollMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Rolling master-key for amrm-tokens"); + this.nextMasterKey = createNewMasterKey(); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } finally { + this.writeLock.unlock(); + } + } + + private class NextKeyActivator extends TimerTask { + @Override + public void run() { + activateNextMasterKey(); + } + } + + public void activateNextMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Activating next master key with id: " + + this.nextMasterKey.getMasterKey().getKeyId()); + this.currentMasterKey = this.nextMasterKey; + this.nextMasterKey = null; + } finally { + this.writeLock.unlock(); + } + } + + @Private + @VisibleForTesting + public MasterKeyData createNewMasterKey() { + this.writeLock.lock(); + try { + return new MasterKeyData(serialNo++, generateSecret()); + } finally { + this.writeLock.unlock(); + } + } + + public Token createAndGetAMRMToken( + ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId); + AMRMTokenIdentifier identifier = + new AMRMTokenIdentifier(appAttemptId, getMasterKey() + .getMasterKey().getKeyId()); + byte[] password = this.createPassword(identifier); + appAttemptSet.add(appAttemptId); + return new Token(identifier.getBytes(), + password, identifier.getKind(), new Text()); + } finally { + this.writeLock.unlock(); + } + } + + // If nextMasterKey is not Null, then return nextMasterKey + // otherwise return currentMasterKey. + @VisibleForTesting + public MasterKeyData getMasterKey() { + this.readLock.lock(); + try { + return nextMasterKey == null ? currentMasterKey : nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + /** + * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by + * RPC layer to validate a remote {@link AMRMTokenIdentifier}. + */ + @Override + public byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to retrieve password for " + + applicationAttemptId); + } + if (!appAttemptSet.contains(applicationAttemptId)) { + throw new InvalidToken(applicationAttemptId + + " not found in AMRMProxyTokenSecretManager."); + } + if (identifier.getKeyId() == this.currentMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.currentMasterKey.getSecretKey()); + } else if (nextMasterKey != null + && identifier.getKeyId() == this.nextMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.nextMasterKey.getSecretKey()); + } + throw new InvalidToken("Invalid AMRMToken from " + + applicationAttemptId); + } finally { + this.readLock.unlock(); + } + } + + /** + * Creates an empty TokenId to be used for de-serializing an + * {@link AMRMTokenIdentifier} by the RPC layer. + */ + @Override + public AMRMTokenIdentifier createIdentifier() { + return new AMRMTokenIdentifier(); + } + + @Private + @VisibleForTesting + public MasterKeyData getNextMasterKeyData() { + this.readLock.lock(); + try { + return this.nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Override + @Private + protected byte[] createPassword(AMRMTokenIdentifier identifier) { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + LOG.info("Creating password for " + applicationAttemptId); + return createPassword(identifier.getBytes(), getMasterKey() + .getSecretKey()); + } finally { + this.readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java new file mode 100644 index 00000000000..810dfa806ed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +/** + * Implements the RequestInterceptor interface and provides common functionality + * which can can be used and/or extended by other concrete intercepter classes. + * + */ +public abstract class AbstractRequestInterceptor implements + RequestInterceptor { + private Configuration conf; + private AMRMProxyApplicationContext appContext; + private RequestInterceptor nextInterceptor; + + /** + * Sets the {@link RequestInterceptor} in the chain. + */ + @Override + public void setNextInterceptor(RequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration}. + */ + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration}. + */ + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@link RequestInterceptor}. + */ + @Override + public void init(AMRMProxyApplicationContext appContext) { + Preconditions.checkState(this.appContext == null, + "init is called multiple times on this interceptor: " + + this.getClass().getName()); + this.appContext = appContext; + if (this.nextInterceptor != null) { + this.nextInterceptor.init(appContext); + } + } + + /** + * Disposes the {@link RequestInterceptor}. + */ + @Override + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link RequestInterceptor} in the chain. + */ + @Override + public RequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + + /** + * Gets the {@link AMRMProxyApplicationContext}. + */ + public AMRMProxyApplicationContext getApplicationContext() { + return this.appContext; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java new file mode 100644 index 00000000000..2c7939b0097 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the AbstractRequestInterceptor class and provides an implementation + * that simply forwards the AM requests to the cluster resource manager. + * + */ +public final class DefaultRequestInterceptor extends + AbstractRequestInterceptor { + private static final Logger LOG = LoggerFactory + .getLogger(DefaultRequestInterceptor.class); + private ApplicationMasterProtocol rmClient; + private UserGroupInformation user = null; + + @Override + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + try { + user = + UserGroupInformation.createProxyUser(appContext + .getApplicationAttemptId().toString(), UserGroupInformation + .getCurrentUser()); + user.addToken(appContext.getAMRMToken()); + final Configuration conf = this.getConf(); + + rmClient = + user.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); + } + }); + } catch (IOException e) { + String message = + "Error while creating of RM app master service proxy for attemptId:" + + appContext.getApplicationAttemptId().toString(); + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request) + throws YarnException, IOException { + LOG.info("Forwarding registration request to the real YARN RM"); + return rmClient.registerApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(final AllocateRequest request) + throws YarnException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the real YARN RM"); + } + AllocateResponse allocateResponse = rmClient.allocate(request); + if (allocateResponse.getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAMRMToken()); + } + + return allocateResponse; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + final FinishApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Forwarding finish application request to " + + "the real YARN Resource Manager"); + return rmClient.finishApplicationMaster(request); + } + + @Override + public void setNextInterceptor(RequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on DefaultRequestInterceptor," + + "which should be the last one in the chain " + + "Check if the interceptor pipeline configuration is correct"); + } + + private void updateAMRMToken(Token token) throws IOException { + org.apache.hadoop.security.token.Token amrmToken = + new org.apache.hadoop.security.token.Token( + token.getIdentifier().array(), token.getPassword().array(), + new Text(token.getKind()), new Text(token.getService())); + // Preserve the token service sent by the RM when adding the token + // to ensure we replace the previous token setup by the RM. + // Afterwards we can update the service address for the RPC layer. + user.addToken(amrmToken); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf())); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java new file mode 100644 index 00000000000..c74c88fffcd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the application + * master to the resource manager. + */ +public interface RequestInterceptor extends ApplicationMasterProtocol, + Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param ctx + */ + void init(AMRMProxyApplicationContext ctx); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor + */ + void setNextInterceptor(RequestInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + RequestInterceptor getNextInterceptor(); + + /** + * Returns the context. + * + * @return the context + */ + AMRMProxyApplicationContext getApplicationContext(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 68c7f2c5473..a658e53439b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -42,7 +42,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; @@ -51,7 +50,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; @@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; @@ -103,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; @@ -172,6 +173,8 @@ public class ContainerManagerImpl extends CompositeService implements private boolean serviceStopped = false; private final ReadLock readLock; private final WriteLock writeLock; + private AMRMProxyService amrmProxyService; + private boolean amrmProxyEnabled = false; private long waitForContainersOnShutdownMillis; @@ -235,6 +238,20 @@ public void serviceInit(Configuration conf) throws Exception { addService(sharedCacheUploader); dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader); + amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (amrmProxyEnabled) { + LOG.info("AMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + this.amrmProxyService = + new AMRMProxyService(this.context, this.dispatcher); + addService(this.amrmProxyService); + } else { + LOG.info("AMRMProxyService is disabled"); + } + waitForContainersOnShutdownMillis = conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + @@ -246,6 +263,10 @@ public void serviceInit(Configuration conf) throws Exception { recover(); } + public boolean isARMRMProxyEnabled() { + return amrmProxyEnabled; + } + @SuppressWarnings("unchecked") private void recover() throws IOException, URISyntaxException { NMStateStoreService stateStore = context.getNMStateStore(); @@ -314,7 +335,8 @@ private void recoverContainer(RecoveredContainerState rcs) + " with exit code " + rcs.getExitCode()); if (context.getApplications().containsKey(appId)) { - Credentials credentials = parseCredentials(launchContext); + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(launchContext); Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), @@ -737,8 +759,17 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, verifyAndGetContainerTokenIdentifier(request.getContainerToken(), containerTokenIdentifier); containerId = containerTokenIdentifier.getContainerID(); - startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, - request); + + // Initialize the AMRMProxy service instance only if the container is of + // type AM and if the AMRMProxy service is enabled + if (isARMRMProxyEnabled() + && containerTokenIdentifier.getContainerType().equals( + ContainerType.APPLICATION_MASTER)) { + this.amrmProxyService.processApplicationStartRequest(request); + } + + startContainerInternal(nmTokenIdentifier, + containerTokenIdentifier, request); succeededContainers.add(containerId); } catch (YarnException e) { failedContainers.put(containerId, SerializedException.newInstance(e)); @@ -751,7 +782,7 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, } return StartContainersResponse.newInstance(getAuxServiceMetaData(), - succeededContainers, failedContainers); + succeededContainers, failedContainers); } private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, @@ -844,7 +875,8 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, } } - Credentials credentials = parseCredentials(launchContext); + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(launchContext); Container container = new ContainerImpl(getConfig(), this.dispatcher, @@ -928,27 +960,6 @@ protected void updateNMTokenIdentifier(NMTokenIdentifier nmTokenIdentifier) nmTokenIdentifier); } - private Credentials parseCredentials(ContainerLaunchContext launchContext) - throws IOException { - Credentials credentials = new Credentials(); - // //////////// Parse credentials - ByteBuffer tokens = launchContext.getTokens(); - - if (tokens != null) { - DataInputByteBuffer buf = new DataInputByteBuffer(); - tokens.rewind(); - buf.reset(tokens); - credentials.readTokenStorageStream(buf); - if (LOG.isDebugEnabled()) { - for (Token tk : credentials.getAllTokens()) { - LOG.debug(tk.getService() + " = " + tk.toString()); - } - } - } - // //////////// End of parsing credentials - return credentials; - } - /** * Stop a list of containers running on this NodeManager. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java new file mode 100644 index 00000000000..964379a411a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -0,0 +1,677 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the AMRMProxyService test cases. It provides utility + * methods that can be used by the concrete test case classes + * + */ +public abstract class BaseAMRMProxyTest { + private static final Log LOG = LogFactory + .getLog(BaseAMRMProxyTest.class); + /** + * The AMRMProxyService instance that will be used by all the test cases + */ + private MockAMRMProxyService amrmProxyService; + /** + * Thread pool used for asynchronous operations + */ + private static ExecutorService threadpool = Executors + .newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + protected MockAMRMProxyService getAMRMProxyService() { + Assert.assertNotNull(this.amrmProxyService); + return this.amrmProxyService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + + mockPassThroughInterceptorClass + "," + + mockPassThroughInterceptorClass + "," + + MockRequestInterceptor.class.getName()); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.amrmProxyService = createAndStartAMRMProxyService(); + } + + @After + public void tearDown() { + amrmProxyService.stop(); + amrmProxyService = null; + this.dispatcher.stop(); + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockAMRMProxyService createAndStartAMRMProxyService() { + MockAMRMProxyService svc = + new MockAMRMProxyService(new NullContext(), dispatcher); + svc.init(conf); + svc.start(); + return svc; + } + + /** + * This helper method will invoke the specified function in parallel for each + * end point in the specified list using a thread pool and return the + * responses received from the function. It implements the logic required for + * dispatching requests in parallel and waiting for the responses. If any of + * the function call fails or times out, it will ignore and proceed with the + * rest. So the responses returned can be less than the number of end points + * specified + * + * @param testContext + * @param func + * @return + */ + protected List runInParallel(List testContexts, + final Function func) { + ExecutorCompletionService completionService = + new ExecutorCompletionService(this.getThreadPool()); + LOG.info("Sending requests to endpoints asynchronously. Number of test contexts=" + + testContexts.size()); + for (int index = 0; index < testContexts.size(); index++) { + final T testContext = testContexts.get(index); + + LOG.info("Adding request to threadpool for test context: " + + testContext.toString()); + + completionService.submit(new Callable() { + @Override + public R call() throws Exception { + LOG.info("Sending request. Test context:" + + testContext.toString()); + + R response = null; + try { + response = func.invoke(testContext); + LOG.info("Successfully sent request for context: " + + testContext.toString()); + } catch (Throwable ex) { + LOG.error("Failed to process request for context: " + + testContext); + response = null; + } + + return response; + } + }); + } + + ArrayList responseList = new ArrayList(); + LOG.info("Waiting for responses from endpoints. Number of contexts=" + + testContexts.size()); + for (int i = 0; i < testContexts.size(); ++i) { + try { + final Future future = completionService.take(); + final R response = future.get(3000, TimeUnit.MILLISECONDS); + responseList.add(response); + } catch (Throwable e) { + LOG.error("Failed to process request " + e.getMessage()); + } + } + + return responseList; + } + + /** + * Helper method to register an application master using specified testAppId + * as the application identifier and return the response + * + * @param testAppId + * @return + * @throws Exception + * @throws YarnException + * @throws IOException + */ + protected RegisterApplicationMasterResponse registerApplicationMaster( + final int testAppId) throws Exception, YarnException, IOException { + final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); + + return ugi + .getUser() + .doAs( + new PrivilegedExceptionAction() { + @Override + public RegisterApplicationMasterResponse run() + throws Exception { + getAMRMProxyService().initApp( + ugi.getAppAttemptId(), + ugi.getUser().getUserName()); + + final RegisterApplicationMasterRequest req = + Records + .newRecord(RegisterApplicationMasterRequest.class); + req.setHost(Integer.toString(testAppId)); + req.setRpcPort(testAppId); + req.setTrackingUrl(""); + + RegisterApplicationMasterResponse response = + getAMRMProxyService().registerApplicationMaster(req); + return response; + } + }); + } + + /** + * Helper method that can be used to register multiple application masters in + * parallel to the specified RM end points + * + * @param testContexts - used to identify the requests + * @return + */ + protected List> registerApplicationMastersInParallel( + final ArrayList testContexts) { + List> responses = + runInParallel(testContexts, + new Function>() { + @Override + public RegisterApplicationMasterResponseInfo invoke( + T testContext) { + RegisterApplicationMasterResponseInfo response = null; + try { + int index = testContexts.indexOf(testContext); + response = + new RegisterApplicationMasterResponseInfo( + registerApplicationMaster(index), testContext); + Assert.assertNotNull(response.getResponse()); + Assert.assertEquals(Integer.toString(index), response + .getResponse().getQueue()); + + LOG.info("Sucessfully registered application master with test context: " + + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to register application master with test context: " + + testContext); + } + + return response; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + testContexts.size(), responses.size()); + + Set contextResponses = new TreeSet(); + for (RegisterApplicationMasterResponseInfo item : responses) { + contextResponses.add(item.getTestContext()); + } + + for (T ep : testContexts) { + Assert.assertTrue(contextResponses.contains(ep)); + } + + return responses; + } + + /** + * Unregisters the application master for specified application id + * + * @param appId + * @param status + * @return + * @throws Exception + * @throws YarnException + * @throws IOException + */ + protected FinishApplicationMasterResponse finishApplicationMaster( + final int appId, final FinalApplicationStatus status) + throws Exception, YarnException, IOException { + + final ApplicationUserInfo ugi = getApplicationUserInfo(appId); + + return ugi.getUser().doAs( + new PrivilegedExceptionAction() { + @Override + public FinishApplicationMasterResponse run() throws Exception { + final FinishApplicationMasterRequest req = + Records.newRecord(FinishApplicationMasterRequest.class); + req.setDiagnostics(""); + req.setTrackingUrl(""); + req.setFinalApplicationStatus(status); + + FinishApplicationMasterResponse response = + getAMRMProxyService().finishApplicationMaster(req); + + getAMRMProxyService().stopApp( + ugi.getAppAttemptId().getApplicationId()); + + return response; + } + }); + } + + protected List> finishApplicationMastersInParallel( + final ArrayList testContexts) { + List> responses = + runInParallel(testContexts, + new Function>() { + @Override + public FinishApplicationMasterResponseInfo invoke( + T testContext) { + FinishApplicationMasterResponseInfo response = null; + try { + response = + new FinishApplicationMasterResponseInfo( + finishApplicationMaster( + testContexts.indexOf(testContext), + FinalApplicationStatus.SUCCEEDED), + testContext); + Assert.assertNotNull(response.getResponse()); + + LOG.info("Sucessfully finished application master with test contexts: " + + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to finish application master with test context: " + + testContext); + } + + return response; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + testContexts.size(), responses.size()); + + Set contextResponses = new TreeSet(); + for (FinishApplicationMasterResponseInfo item : responses) { + Assert.assertNotNull(item); + Assert.assertNotNull(item.getResponse()); + contextResponses.add(item.getTestContext()); + } + + for (T ep : testContexts) { + Assert.assertTrue(contextResponses.contains(ep)); + } + + return responses; + } + + protected AllocateResponse allocate(final int testAppId) + throws Exception, YarnException, IOException { + final AllocateRequest req = Records.newRecord(AllocateRequest.class); + req.setResponseId(testAppId); + return allocate(testAppId, req); + } + + protected AllocateResponse allocate(final int testAppId, + final AllocateRequest request) throws Exception, YarnException, + IOException { + + final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); + + return ugi.getUser().doAs( + new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + AllocateResponse response = + getAMRMProxyService().allocate(request); + return response; + } + }); + } + + protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) { + final ApplicationAttemptId attemptId = + getApplicationAttemptId(testAppId); + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1); + ugi.addTokenIdentifier(token); + return new ApplicationUserInfo(ugi, attemptId); + } + + protected List createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers) + throws Exception { + return createResourceRequests(hosts, memory, vCores, priority, + containers, null); + } + + protected List createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers, + String labelExpression) throws Exception { + List reqs = new ArrayList(); + for (String host : hosts) { + ResourceRequest hostReq = + createResourceRequest(host, memory, vCores, priority, + containers, labelExpression); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceRequest("/default-rack", memory, vCores, priority, + containers, labelExpression); + reqs.add(rackReq); + } + + ResourceRequest offRackReq = + createResourceRequest(ResourceRequest.ANY, memory, vCores, + priority, containers, labelExpression); + reqs.add(offRackReq); + return reqs; + } + + protected ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers) + throws Exception { + return createResourceRequest(resource, memory, vCores, priority, + containers, null); + } + + protected ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + String labelExpression) throws Exception { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setResourceName(resource); + req.setNumContainers(containers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + req.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memory); + capability.setVirtualCores(vCores); + req.setCapability(capability); + if (labelExpression != null) { + req.setNodeLabelExpression(labelExpression); + } + return req; + } + + /** + * Returns an ApplicationId with the specified identifier + * + * @param testAppId + * @return + */ + protected ApplicationId getApplicationId(int testAppId) { + return ApplicationId.newInstance(123456, testAppId); + } + + /** + * Return an instance of ApplicationAttemptId using specified identifier. This + * identifier will be used for the ApplicationId too. + * + * @param testAppId + * @return + */ + protected ApplicationAttemptId getApplicationAttemptId(int testAppId) { + return ApplicationAttemptId.newInstance(getApplicationId(testAppId), + testAppId); + } + + /** + * Return an instance of ApplicationAttemptId using specified identifier and + * application id + * + * @param testAppId + * @return + */ + protected ApplicationAttemptId getApplicationAttemptId(int testAppId, + ApplicationId appId) { + return ApplicationAttemptId.newInstance(appId, testAppId); + } + + protected static class RegisterApplicationMasterResponseInfo { + private RegisterApplicationMasterResponse response; + private T testContext; + + RegisterApplicationMasterResponseInfo( + RegisterApplicationMasterResponse response, T testContext) { + this.response = response; + this.testContext = testContext; + } + + public RegisterApplicationMasterResponse getResponse() { + return response; + } + + public T getTestContext() { + return testContext; + } + } + + protected static class FinishApplicationMasterResponseInfo { + private FinishApplicationMasterResponse response; + private T testContext; + + FinishApplicationMasterResponseInfo( + FinishApplicationMasterResponse response, T testContext) { + this.response = response; + this.testContext = testContext; + } + + public FinishApplicationMasterResponse getResponse() { + return response; + } + + public T getTestContext() { + return testContext; + } + } + + protected static class ApplicationUserInfo { + private UserGroupInformation user; + private ApplicationAttemptId attemptId; + + ApplicationUserInfo(UserGroupInformation user, + ApplicationAttemptId attemptId) { + this.user = user; + this.attemptId = attemptId; + } + + public UserGroupInformation getUser() { + return this.user; + } + + public ApplicationAttemptId getAppAttemptId() { + return this.attemptId; + } + } + + protected static class MockAMRMProxyService extends AMRMProxyService { + public MockAMRMProxyService(Context nmContext, + AsyncDispatcher dispatcher) { + super(nmContext, dispatcher); + } + + /** + * This method is used by the test code to initialize the pipeline. In the + * actual service, the initialization is called by the + * ContainerManagerImpl::StartContainers method + * + * @param applicationId + * @param user + */ + public void initApp(ApplicationAttemptId applicationId, String user) { + super.initializePipeline(applicationId, user, null, null); + } + + public void stopApp(ApplicationId applicationId) { + super.stopApplication(applicationId); + } + } + + /** + * The Function interface is used for passing method pointers that can be + * invoked asynchronously at a later point. + */ + protected interface Function { + public R invoke(T input); + } + + protected class NullContext implements Context { + + @Override + public NodeId getNodeId() { + return null; + } + + @Override + public int getHttpPort() { + return 0; + } + + @Override + public ConcurrentMap getApplications() { + return null; + } + + @Override + public Map getSystemCredentialsForApps() { + return null; + } + + @Override + public ConcurrentMap getContainers() { + return null; + } + + @Override + public NMContainerTokenSecretManager getContainerTokenSecretManager() { + return null; + } + + @Override + public NMTokenSecretManagerInNM getNMTokenSecretManager() { + return null; + } + + @Override + public NodeHealthStatus getNodeHealthStatus() { + return null; + } + + @Override + public ContainerManagementProtocol getContainerManager() { + return null; + } + + @Override + public LocalDirsHandlerService getLocalDirsHandler() { + return null; + } + + @Override + public ApplicationACLsManager getApplicationACLsManager() { + return null; + } + + @Override + public NMStateStoreService getNMStateStore() { + return null; + } + + @Override + public boolean getDecommissioned() { + return false; + } + + @Override + public void setDecommissioned(boolean isDecommissioned) { + } + + @Override + public ConcurrentLinkedQueue getLogAggregationStatusForApps() { + return null; + } + + @Override + public NodeResourceMonitor getNodeResourceMonitor() { + return null; + } + + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java new file mode 100644 index 00000000000..c962f97a020 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class MockRequestInterceptor extends AbstractRequestInterceptor { + + private MockResourceManagerFacade mockRM; + + public MockRequestInterceptor() { + } + + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + mockRM = + new MockResourceManagerFacade(new YarnConfiguration( + super.getConf()), 0); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return mockRM.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return mockRM.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return mockRM.allocate(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java new file mode 100644 index 00000000000..7573a7a52bb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.directory.api.util.Strings; +import org.apache.directory.api.util.exception.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.mortbay.log.Log; + +/** + * Mock Resource Manager facade implementation that exposes all the methods + * implemented by the YARN RM. The behavior and the values returned by this mock + * implementation is expected by the unit test cases. So please change the + * implementation with care. + */ +public class MockResourceManagerFacade implements + ApplicationMasterProtocol, ApplicationClientProtocol { + + private HashMap> applicationContainerIdMap = + new HashMap>(); + private HashMap allocatedContainerMap = + new HashMap(); + private AtomicInteger containerIndex = new AtomicInteger(0); + private Configuration conf; + + public MockResourceManagerFacade(Configuration conf, + int startContainerIndex) { + this.conf = conf; + this.containerIndex.set(startContainerIndex); + } + + private static String getAppIdentifier() throws IOException { + AMRMTokenIdentifier result = null; + UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); + Set tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + return result != null ? result.getApplicationAttemptId().toString() + : ""; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + String amrmToken = getAppIdentifier(); + Log.info("Registering application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + Assert.assertFalse("The application id is already registered: " + + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(amrmToken, + new ArrayList()); + } + + return RegisterApplicationMasterResponse.newInstance(null, null, null, + null, null, request.getHost(), null); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + String amrmToken = getAppIdentifier(); + Log.info("Finishing application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + // Remove the containers that were being tracked for this application + Assert.assertTrue("The application id is NOT registered: " + + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.remove(amrmToken); + for (ContainerId c : ids) { + allocatedContainerMap.remove(c); + } + } + + return FinishApplicationMasterResponse + .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true + : false); + } + + protected ApplicationId getApplicationId(int id) { + return ApplicationId.newInstance(12345, id); + } + + protected ApplicationAttemptId getApplicationAttemptId(int id) { + return ApplicationAttemptId.newInstance(getApplicationId(id), 1); + } + + @SuppressWarnings("deprecation") + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (request.getAskList() != null && request.getAskList().size() > 0 + && request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Assert.fail("The mock RM implementation does not support receiving " + + "askList and releaseList in the same heartbeat"); + } + + String amrmToken = getAppIdentifier(); + + ArrayList containerList = new ArrayList(); + if (request.getAskList() != null) { + for (ResourceRequest rr : request.getAskList()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + ContainerId containerId = + ContainerId.newInstance(getApplicationAttemptId(1), + containerIndex.incrementAndGet()); + Container container = Records.newRecord(Container.class); + container.setId(containerId); + container.setPriority(rr.getPriority()); + + // We don't use the node for running containers in the test cases. So + // it is OK to hard code it to some dummy value + NodeId nodeId = + NodeId.newInstance( + !Strings.isEmpty(rr.getResourceName()) ? rr + .getResourceName() : "dummy", 1000); + container.setNodeId(nodeId); + container.setResource(rr.getCapability()); + containerList.add(container); + + synchronized (applicationContainerIdMap) { + // Keep track of the containers returned to this application. We + // will need it in future + Assert.assertTrue( + "The application id is Not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = + applicationContainerIdMap.get(amrmToken); + ids.add(containerId); + this.allocatedContainerMap.put(containerId, container); + } + } + } + } + + if (request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Log.info("Releasing containers: " + request.getReleaseList().size()); + synchronized (applicationContainerIdMap) { + Assert.assertTrue( + "The application id is not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.get(amrmToken); + + for (ContainerId id : request.getReleaseList()) { + boolean found = false; + for (ContainerId c : ids) { + if (c.equals(id)) { + found = true; + break; + } + } + + Assert.assertTrue( + "ContainerId " + id + + " being released is not valid for application: " + + conf.get("AMRMTOKEN"), found); + + ids.remove(id); + + // Return the released container back to the AM with new fake Ids. The + // test case does not care about the IDs. The IDs are faked because + // otherwise the LRM will throw duplication identifier exception. This + // returning of fake containers is ONLY done for testing purpose - for + // the test code to get confirmation that the sub-cluster resource + // managers received the release request + ContainerId fakeContainerId = + ContainerId.newInstance(getApplicationAttemptId(1), + containerIndex.incrementAndGet()); + Container fakeContainer = allocatedContainerMap.get(id); + fakeContainer.setId(fakeContainerId); + containerList.add(fakeContainer); + } + } + } + + Log.info("Allocating containers: " + containerList.size() + + " for application attempt: " + conf.get("AMRMTOKEN")); + return AllocateResponse.newInstance(0, + new ArrayList(), containerList, + new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList(), + new ArrayList(), + new ArrayList()); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, + IOException { + + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(request.getApplicationId()); + report.setCurrentApplicationAttemptId(ApplicationAttemptId + .newInstance(request.getApplicationId(), 1)); + response.setApplicationReport(report); + return response; + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) throws YarnException, + IOException { + GetApplicationAttemptReportResponse response = + Records.newRecord(GetApplicationAttemptReportResponse.class); + ApplicationAttemptReport report = + Records.newRecord(ApplicationAttemptReport.class); + report.setApplicationAttemptId(request.getApplicationAttemptId()); + report + .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + response.setApplicationAttemptReport(report); + return response; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationsResponse getApplications( + GetApplicationsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodesResponse getClusterNodes( + GetClusterNodesRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return null; + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) throws YarnException, + IOException { + return null; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java new file mode 100644 index 00000000000..97a844eb778 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain + * + */ +public class PassThroughRequestInterceptor extends + AbstractRequestInterceptor { + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return getNextInterceptor().allocate(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java new file mode 100644 index 00000000000..69b913a7c5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestAMRMProxyService extends BaseAMRMProxyTest { + + private static final Log LOG = LogFactory + .getLog(TestAMRMProxyService.class); + + /** + * Test if the pipeline is created properly. + */ + @Test + public void testRequestInterceptorChainCreation() throws Exception { + RequestInterceptor root = + super.getAMRMProxyService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + switch (index) { + case 0: + case 1: + case 2: + Assert.assertEquals(PassThroughRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 3: + Assert.assertEquals(MockRequestInterceptor.class.getName(), root + .getClass().getName()); + break; + } + + root = root.getNextInterceptor(); + index++; + } + + Assert.assertEquals( + "The number of interceptors in chain does not match", + Integer.toString(4), Integer.toString(index)); + + } + + /** + * Tests registration of a single application master. + * + * @throws Exception + */ + @Test + public void testRegisterOneApplicationMaster() throws Exception { + // The testAppId identifier is used as host name and the mock resource + // manager return it as the queue name. Assert that we received the queue + // name + int testAppId = 1; + RegisterApplicationMasterResponse response1 = + registerApplicationMaster(testAppId); + Assert.assertNotNull(response1); + Assert.assertEquals(Integer.toString(testAppId), response1.getQueue()); + } + + /** + * Tests the registration of multiple application master serially one at a + * time. + * + * @throws Exception + */ + @Test + public void testRegisterMulitpleApplicationMasters() throws Exception { + for (int testAppId = 0; testAppId < 3; testAppId++) { + RegisterApplicationMasterResponse response = + registerApplicationMaster(testAppId); + Assert.assertNotNull(response); + Assert + .assertEquals(Integer.toString(testAppId), response.getQueue()); + } + } + + /** + * Tests the registration of multiple application masters using multiple + * threads in parallel. + * + * @throws Exception + */ + @Test + public void testRegisterMulitpleApplicationMastersInParallel() + throws Exception { + int numberOfRequests = 5; + ArrayList testContexts = + CreateTestRequestIdentifiers(numberOfRequests); + super.registerApplicationMastersInParallel(testContexts); + } + + private ArrayList CreateTestRequestIdentifiers( + int numberOfRequests) { + ArrayList testContexts = new ArrayList(); + LOG.info("Creating " + numberOfRequests + " contexts for testing"); + for (int ep = 0; ep < numberOfRequests; ep++) { + testContexts.add("test-endpoint-" + Integer.toString(ep)); + LOG.info("Created test context: " + testContexts.get(ep)); + } + return testContexts; + } + + @Test + public void testFinishOneApplicationMasterWithSuccess() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, + FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test + public void testFinishOneApplicationMasterWithFailure() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(false, finshResponse.getIsUnregistered()); + + try { + // Try to finish an application master that is already finished. + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishInvalidApplicationMaster() throws Exception { + try { + // Try to finish an application master that was not registered. + finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishMulitpleApplicationMasters() throws Exception { + int numberOfRequests = 3; + for (int index = 0; index < numberOfRequests; index++) { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(index); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(index), + registerResponse.getQueue()); + } + + // Finish in reverse sequence + for (int index = numberOfRequests - 1; index >= 0; index--) { + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + // Assert that the application has been removed from the collection + Assert.assertTrue(this.getAMRMProxyService() + .getPipelines().size() == index); + } + + try { + // Try to finish an application master that is already finished. + finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + + try { + // Try to finish an application master that was not registered. + finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishMulitpleApplicationMastersInParallel() + throws Exception { + int numberOfRequests = 5; + ArrayList testContexts = new ArrayList(); + LOG.info("Creating " + numberOfRequests + " contexts for testing"); + for (int i = 0; i < numberOfRequests; i++) { + testContexts.add("test-endpoint-" + Integer.toString(i)); + LOG.info("Created test context: " + testContexts.get(i)); + + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(i); + Assert.assertNotNull(registerResponse); + Assert + .assertEquals(Integer.toString(i), registerResponse.getQueue()); + } + + finishApplicationMastersInParallel(testContexts); + } + + @Test + public void testAllocateRequestWithNullValues() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + AllocateResponse allocateResponse = allocate(testAppId); + Assert.assertNotNull(allocateResponse); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, + FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test + public void testAllocateRequestWithoutRegistering() throws Exception { + + try { + // Try to allocate an application master without registering. + allocate(1); + Assert + .fail("The request to allocate application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("AllocateRequest failed as expected because AM was not registered"); + } + } + + @Test + public void testAllocateWithOneResourceRequest() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + getContainersAndAssert(testAppId, 1); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateWithMultipleResourceRequest() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + getContainersAndAssert(testAppId, 10); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateAndReleaseContainers() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + List containers = getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateAndReleaseContainersForMultipleAM() + throws Exception { + int numberOfApps = 5; + for (int testAppId = 0; testAppId < numberOfApps; testAppId++) { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + List containers = getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + } + for (int testAppId = 0; testAppId < numberOfApps; testAppId++) { + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + } + + @Test + public void testAllocateAndReleaseContainersForMultipleAMInParallel() + throws Exception { + int numberOfApps = 6; + ArrayList tempAppIds = new ArrayList(); + for (int i = 0; i < numberOfApps; i++) { + tempAppIds.add(new Integer(i)); + } + + final ArrayList appIds = tempAppIds; + List responses = + runInParallel(appIds, new Function() { + @Override + public Integer invoke(Integer testAppId) { + try { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull("response is null", registerResponse); + List containers = + getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + + LOG.info("Sucessfully registered application master with appId: " + + testAppId); + } catch (Throwable ex) { + LOG.error( + "Failed to register application master with appId: " + + testAppId, ex); + testAppId = null; + } + + return testAppId; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + appIds.size(), responses.size()); + + for (Integer testAppId : responses) { + Assert.assertNotNull(testAppId); + finishApplicationMaster(testAppId.intValue(), + FinalApplicationStatus.SUCCEEDED); + } + } + + private List getContainersAndAssert(int appId, + int numberOfResourceRequests) throws Exception { + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List containers = + new ArrayList(numberOfResourceRequests); + List askList = + new ArrayList(numberOfResourceRequests); + for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) { + askList.add(createResourceRequest( + "test-node-" + Integer.toString(testAppId), 6000, 2, + testAppId % 5, 1)); + } + + allocateRequest.setAskList(askList); + + AllocateResponse allocateResponse = allocate(appId, allocateRequest); + Assert.assertNotNull("allocate() returned null response", + allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containers.size() < askList.size() && numHeartbeat++ < 10) { + allocateResponse = + allocate(appId, Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull("allocate() returned null response", + allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + + LOG.info("Number of allocated containers in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers() + .size())); + LOG.info("Total number of allocated containers: " + + Integer.toString(containers.size())); + Thread.sleep(10); + } + + // We broadcast the request, the number of containers we received will be + // higher than we ask + Assert.assertTrue("The asklist count is not same as response", + askList.size() <= containers.size()); + return containers; + } + + private void releaseContainersAndAssert(int appId, + List containers) throws Exception { + Assert.assertTrue(containers.size() > 0); + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List relList = + new ArrayList(containers.size()); + for (Container container : containers) { + relList.add(container.getId()); + } + + allocateRequest.setReleaseList(relList); + + AllocateResponse allocateResponse = allocate(appId, allocateRequest); + Assert.assertNotNull(allocateResponse); + + // The way the mock resource manager is setup, it will return the containers + // that were released in the response. This is done because the UAMs run + // asynchronously and we need to if all the resource managers received the + // release it. The containers sent by the mock resource managers will be + // aggregated and returned back to us and we can assert if all the release + // lists reached the sub-clusters + List containersForReleasedContainerIds = + new ArrayList(); + containersForReleasedContainerIds.addAll(allocateResponse + .getAllocatedContainers()); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containersForReleasedContainerIds.size() < relList.size() + && numHeartbeat++ < 10) { + allocateResponse = + allocate(appId, Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull(allocateResponse); + containersForReleasedContainerIds.addAll(allocateResponse + .getAllocatedContainers()); + + LOG.info("Number of containers received in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers() + .size())); + LOG.info("Total number of containers received: " + + Integer.toString(containersForReleasedContainerIds.size())); + Thread.sleep(10); + } + + Assert.assertEquals(relList.size(), + containersForReleasedContainerIds.size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index c8b985df51c..14142dee900 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -40,9 +40,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -76,7 +74,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -95,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -175,69 +173,13 @@ public InetSocketAddress getBindAddress() { return this.masterServiceAddress; } - // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer - // currently sets only the required id, but iterate through anyways just to be - // sure. - private AMRMTokenIdentifier selectAMRMTokenIdentifier( - UserGroupInformation remoteUgi) throws IOException { - AMRMTokenIdentifier result = null; - Set tokenIds = remoteUgi.getTokenIdentifiers(); - for (TokenIdentifier tokenId : tokenIds) { - if (tokenId instanceof AMRMTokenIdentifier) { - result = (AMRMTokenIdentifier) tokenId; - break; - } - } - - return result; - } - - private AMRMTokenIdentifier authorizeRequest() - throws YarnException { - - UserGroupInformation remoteUgi; - try { - remoteUgi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - String msg = - "Cannot obtain the user-name for authorizing ApplicationMaster. " - + "Got exception: " + StringUtils.stringifyException(e); - LOG.warn(msg); - throw RPCUtil.getRemoteException(msg); - } - - boolean tokenFound = false; - String message = ""; - AMRMTokenIdentifier appTokenIdentifier = null; - try { - appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi); - if (appTokenIdentifier == null) { - tokenFound = false; - message = "No AMRMToken found for user " + remoteUgi.getUserName(); - } else { - tokenFound = true; - } - } catch (IOException e) { - tokenFound = false; - message = - "Got exception while looking for AMRMToken for user " - + remoteUgi.getUserName(); - } - - if (!tokenFound) { - LOG.warn(message); - throw RPCUtil.getRemoteException(message); - } - - return appTokenIdentifier; - } - @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); ApplicationAttemptId applicationAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); @@ -346,7 +288,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( IOException { ApplicationAttemptId applicationAttemptId = - authorizeRequest().getApplicationAttemptId(); + YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId(); ApplicationId appId = applicationAttemptId.getApplicationId(); RMApp rmApp = @@ -430,7 +372,8 @@ public boolean hasApplicationMasterRegistered( public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId();