YARN-5531. UnmanagedAM pool manager for federating application across clusters. (Botong Huang via Subru).
(cherry picked from commit 73bb2102ce
)
This commit is contained in:
parent
7444406d6d
commit
859aa1f9d6
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic interface that can be used for calling back when a corresponding
|
||||||
|
* asynchronous operation completes.
|
||||||
|
*
|
||||||
|
* @param <T> parameter type for the callback
|
||||||
|
*/
|
||||||
|
public interface AsyncCallback<T> {
|
||||||
|
/**
|
||||||
|
* This method is called back when the corresponding asynchronous operation
|
||||||
|
* completes.
|
||||||
|
*
|
||||||
|
* @param response response of the callback
|
||||||
|
*/
|
||||||
|
void callback(T response);
|
||||||
|
}
|
|
@ -19,22 +19,20 @@
|
||||||
package org.apache.hadoop.yarn.server.federation.failover;
|
package org.apache.hadoop.yarn.server.federation.failover;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
||||||
import org.apache.hadoop.security.SaslRpcServer;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -51,10 +49,15 @@ public final class FederationProxyProviderUtil {
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(FederationProxyProviderUtil.class);
|
LoggerFactory.getLogger(FederationProxyProviderUtil.class);
|
||||||
|
|
||||||
|
// Disable constructor
|
||||||
|
private FederationProxyProviderUtil() {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a proxy for the specified protocol. For non-HA, this is a direct
|
* Create a proxy for the specified protocol in the context of Federation. For
|
||||||
* connection to the ResourceManager address. When HA is enabled, the proxy
|
* non-HA, this is a direct connection to the ResourceManager address. When HA
|
||||||
* handles the failover between the ResourceManagers as well.
|
* is enabled, the proxy handles the failover between the ResourceManagers as
|
||||||
|
* well.
|
||||||
*
|
*
|
||||||
* @param configuration Configuration to generate {@link ClientRMProxy}
|
* @param configuration Configuration to generate {@link ClientRMProxy}
|
||||||
* @param protocol Protocol for the proxy
|
* @param protocol Protocol for the proxy
|
||||||
|
@ -67,15 +70,16 @@ public final class FederationProxyProviderUtil {
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public static <T> T createRMProxy(Configuration configuration,
|
public static <T> T createRMProxy(Configuration configuration,
|
||||||
final Class<T> protocol, SubClusterId subClusterId,
|
Class<T> protocol, SubClusterId subClusterId, UserGroupInformation user)
|
||||||
UserGroupInformation user) throws IOException {
|
throws IOException {
|
||||||
return createRMProxy(configuration, protocol, subClusterId, user, null);
|
return createRMProxy(configuration, protocol, subClusterId, user, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a proxy for the specified protocol. For non-HA, this is a direct
|
* Create a proxy for the specified protocol in the context of Federation. For
|
||||||
* connection to the ResourceManager address. When HA is enabled, the proxy
|
* non-HA, this is a direct connection to the ResourceManager address. When HA
|
||||||
* handles the failover between the ResourceManagers as well.
|
* is enabled, the proxy handles the failover between the ResourceManagers as
|
||||||
|
* well.
|
||||||
*
|
*
|
||||||
* @param configuration Configuration to generate {@link ClientRMProxy}
|
* @param configuration Configuration to generate {@link ClientRMProxy}
|
||||||
* @param protocol Protocol for the proxy
|
* @param protocol Protocol for the proxy
|
||||||
|
@ -88,65 +92,35 @@ public final class FederationProxyProviderUtil {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
@SuppressWarnings("unchecked")
|
public static <T> T createRMProxy(Configuration configuration,
|
||||||
public static <T> T createRMProxy(final Configuration configuration,
|
|
||||||
final Class<T> protocol, SubClusterId subClusterId,
|
final Class<T> protocol, SubClusterId subClusterId,
|
||||||
UserGroupInformation user, final Token token) throws IOException {
|
UserGroupInformation user, Token<? extends TokenIdentifier> token)
|
||||||
try {
|
throws IOException {
|
||||||
final YarnConfiguration conf = new YarnConfiguration(configuration);
|
final YarnConfiguration config = new YarnConfiguration(configuration);
|
||||||
updateConf(conf, subClusterId);
|
updateConfForFederation(config, subClusterId.getId());
|
||||||
if (token != null) {
|
return AMRMClientUtils.createRMProxy(config, protocol, user, token);
|
||||||
LOG.info(
|
|
||||||
"Creating RMProxy with a token: {} to subcluster: {}"
|
|
||||||
+ " for protocol: {}",
|
|
||||||
token, subClusterId, protocol.getSimpleName());
|
|
||||||
user.addToken(token);
|
|
||||||
setAuthModeInConf(conf);
|
|
||||||
} else {
|
|
||||||
LOG.info("Creating RMProxy without a token to subcluster: {}"
|
|
||||||
+ " for protocol: {}", subClusterId, protocol.getSimpleName());
|
|
||||||
}
|
|
||||||
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
|
|
||||||
@Override
|
|
||||||
public T run() throws Exception {
|
|
||||||
return ClientRMProxy.createRMProxy(conf, protocol);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return proxyConnection;
|
|
||||||
} catch (IOException e) {
|
|
||||||
String message =
|
|
||||||
"Error while creating of RM application master service proxy for"
|
|
||||||
+ " appAttemptId: " + user;
|
|
||||||
LOG.info(message);
|
|
||||||
throw new YarnRuntimeException(message, e);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new YarnRuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setAuthModeInConf(Configuration conf) {
|
/**
|
||||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
* Updating the conf with Federation as long as certain subclusterId.
|
||||||
SaslRpcServer.AuthMethod.TOKEN.toString());
|
*
|
||||||
}
|
* @param conf configuration
|
||||||
|
* @param subClusterId subclusterId for the conf
|
||||||
// updating the conf with the refreshed RM addresses as proxy creations
|
*/
|
||||||
// are based out of conf
|
public static void updateConfForFederation(Configuration conf,
|
||||||
private static void updateConf(Configuration conf,
|
String subClusterId) {
|
||||||
SubClusterId subClusterId) {
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId);
|
||||||
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
|
/*
|
||||||
// In a Federation setting, we will connect to not just the local cluster RM
|
* In a Federation setting, we will connect to not just the local cluster RM
|
||||||
// but also multiple external RMs. The membership information of all the RMs
|
* but also multiple external RMs. The membership information of all the RMs
|
||||||
// that are currently
|
* that are currently participating in Federation is available in the
|
||||||
// participating in Federation is available in the central
|
* central FederationStateStore. So we will: 1. obtain the RM service
|
||||||
// FederationStateStore.
|
* addresses from FederationStateStore using the
|
||||||
// So we will:
|
* FederationRMFailoverProxyProvider. 2. disable traditional HA as that
|
||||||
// 1. obtain the RM service addresses from FederationStateStore using the
|
* depends on local configuration lookup for RMs using indexes. 3. we will
|
||||||
// FederationRMFailoverProxyProvider.
|
* enable federation failover IF traditional HA is enabled so that the
|
||||||
// 2. disable traditional HA as that depends on local configuration lookup
|
* appropriate failover RetryPolicy is initialized.
|
||||||
// for RMs using indexes.
|
*/
|
||||||
// 3. we will enable federation failover IF traditional HA is enabled so
|
|
||||||
// that the appropriate failover RetryPolicy is initialized.
|
|
||||||
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||||
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
|
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
|
||||||
FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
|
FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
|
||||||
|
@ -156,8 +130,4 @@ public final class FederationProxyProviderUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// disable instantiation
|
|
||||||
private FederationProxyProviderUtil() {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,311 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.uam;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A service that manages a pool of UAM managers in
|
||||||
|
* {@link UnmanagedApplicationManager}.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(UnmanagedAMPoolManager.class);
|
||||||
|
|
||||||
|
// Map from uamId to UAM instances
|
||||||
|
private Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap;
|
||||||
|
|
||||||
|
private Map<String, ApplicationAttemptId> attemptIdMap;
|
||||||
|
|
||||||
|
private ExecutorService threadpool;
|
||||||
|
|
||||||
|
public UnmanagedAMPoolManager(ExecutorService threadpool) {
|
||||||
|
super(UnmanagedAMPoolManager.class.getName());
|
||||||
|
this.threadpool = threadpool;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
if (this.threadpool == null) {
|
||||||
|
this.threadpool = Executors.newCachedThreadPool();
|
||||||
|
}
|
||||||
|
this.unmanagedAppMasterMap = new ConcurrentHashMap<>();
|
||||||
|
this.attemptIdMap = new ConcurrentHashMap<>();
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Normally we should finish all applications before stop. If there are still
|
||||||
|
* UAMs running, force kill all of them. Do parallel kill because of
|
||||||
|
* performance reasons.
|
||||||
|
*
|
||||||
|
* TODO: move waiting for the kill to finish into a separate thread, without
|
||||||
|
* blocking the serviceStop.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
ExecutorCompletionService<KillApplicationResponse> completionService =
|
||||||
|
new ExecutorCompletionService<>(this.threadpool);
|
||||||
|
if (this.unmanagedAppMasterMap.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save a local copy of the key set so that it won't change with the map
|
||||||
|
Set<String> addressList =
|
||||||
|
new HashSet<>(this.unmanagedAppMasterMap.keySet());
|
||||||
|
LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
|
||||||
|
addressList.size());
|
||||||
|
|
||||||
|
for (final String uamId : addressList) {
|
||||||
|
completionService.submit(new Callable<KillApplicationResponse>() {
|
||||||
|
@Override
|
||||||
|
public KillApplicationResponse call() throws Exception {
|
||||||
|
try {
|
||||||
|
LOG.info("Force-killing UAM id " + uamId + " for application "
|
||||||
|
+ attemptIdMap.get(uamId));
|
||||||
|
return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to kill unmanaged application master", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < addressList.size(); ++i) {
|
||||||
|
try {
|
||||||
|
Future<KillApplicationResponse> future = completionService.take();
|
||||||
|
future.get();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to kill unmanaged application master", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.attemptIdMap.clear();
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new UAM and register the application, without specifying uamId and
|
||||||
|
* appId. We will ask for an appId from RM and use it as the uamId.
|
||||||
|
*
|
||||||
|
* @param registerRequest RegisterApplicationMasterRequest
|
||||||
|
* @param conf configuration for this UAM
|
||||||
|
* @param queueName queue of the application
|
||||||
|
* @param submitter submitter name of the UAM
|
||||||
|
* @param appNameSuffix application name suffix for the UAM
|
||||||
|
* @return uamId for the UAM
|
||||||
|
* @throws YarnException if registerApplicationMaster fails
|
||||||
|
* @throws IOException if registerApplicationMaster fails
|
||||||
|
*/
|
||||||
|
public String createAndRegisterNewUAM(
|
||||||
|
RegisterApplicationMasterRequest registerRequest, Configuration conf,
|
||||||
|
String queueName, String submitter, String appNameSuffix)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
ApplicationId appId = null;
|
||||||
|
ApplicationClientProtocol rmClient;
|
||||||
|
try {
|
||||||
|
UserGroupInformation appSubmitter =
|
||||||
|
UserGroupInformation.createRemoteUser(submitter);
|
||||||
|
rmClient = AMRMClientUtils.createRMProxy(conf,
|
||||||
|
ApplicationClientProtocol.class, appSubmitter, null);
|
||||||
|
|
||||||
|
// Get a new appId from RM
|
||||||
|
GetNewApplicationResponse response =
|
||||||
|
rmClient.getNewApplication(GetNewApplicationRequest.newInstance());
|
||||||
|
if (response == null) {
|
||||||
|
throw new YarnException("getNewApplication got null response");
|
||||||
|
}
|
||||||
|
appId = response.getApplicationId();
|
||||||
|
LOG.info("Received new application ID {} from RM", appId);
|
||||||
|
} finally {
|
||||||
|
rmClient = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId,
|
||||||
|
queueName, submitter, appNameSuffix);
|
||||||
|
return appId.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new UAM and register the application, using the provided uamId and
|
||||||
|
* appId.
|
||||||
|
*
|
||||||
|
* @param uamId identifier for the UAM
|
||||||
|
* @param registerRequest RegisterApplicationMasterRequest
|
||||||
|
* @param conf configuration for this UAM
|
||||||
|
* @param appId application id for the UAM
|
||||||
|
* @param queueName queue of the application
|
||||||
|
* @param submitter submitter name of the UAM
|
||||||
|
* @param appNameSuffix application name suffix for the UAM
|
||||||
|
* @return RegisterApplicationMasterResponse
|
||||||
|
* @throws YarnException if registerApplicationMaster fails
|
||||||
|
* @throws IOException if registerApplicationMaster fails
|
||||||
|
*/
|
||||||
|
public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
|
||||||
|
RegisterApplicationMasterRequest registerRequest, Configuration conf,
|
||||||
|
ApplicationId appId, String queueName, String submitter,
|
||||||
|
String appNameSuffix) throws YarnException, IOException {
|
||||||
|
|
||||||
|
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
|
||||||
|
throw new YarnException("UAM " + uamId + " already exists");
|
||||||
|
}
|
||||||
|
UnmanagedApplicationManager uam =
|
||||||
|
createUAM(conf, appId, queueName, submitter, appNameSuffix);
|
||||||
|
// Put the UAM into map first before initializing it to avoid additional UAM
|
||||||
|
// for the same uamId being created concurrently
|
||||||
|
this.unmanagedAppMasterMap.put(uamId, uam);
|
||||||
|
|
||||||
|
RegisterApplicationMasterResponse response = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Creating and registering UAM id {} for application {}", uamId,
|
||||||
|
appId);
|
||||||
|
response = uam.createAndRegisterApplicationMaster(registerRequest);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Add the map earlier and remove here if register failed because we want
|
||||||
|
// to make sure there is only one uam instance per uamId at any given time
|
||||||
|
this.unmanagedAppMasterMap.remove(uamId);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.attemptIdMap.put(uamId, uam.getAttemptId());
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the UAM instance. Pull out to make unit test easy.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @param appId application id
|
||||||
|
* @param queueName queue of the application
|
||||||
|
* @param submitter submitter name of the application
|
||||||
|
* @param appNameSuffix application name suffix
|
||||||
|
* @return the UAM instance
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
protected UnmanagedApplicationManager createUAM(Configuration conf,
|
||||||
|
ApplicationId appId, String queueName, String submitter,
|
||||||
|
String appNameSuffix) {
|
||||||
|
return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
|
||||||
|
appNameSuffix);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* AllocateAsync to an UAM.
|
||||||
|
*
|
||||||
|
* @param uamId identifier for the UAM
|
||||||
|
* @param request AllocateRequest
|
||||||
|
* @param callback callback for response
|
||||||
|
* @throws YarnException if allocate fails
|
||||||
|
* @throws IOException if allocate fails
|
||||||
|
*/
|
||||||
|
public void allocateAsync(String uamId, AllocateRequest request,
|
||||||
|
AsyncCallback<AllocateResponse> callback)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
|
||||||
|
throw new YarnException("UAM " + uamId + " does not exist");
|
||||||
|
}
|
||||||
|
this.unmanagedAppMasterMap.get(uamId).allocateAsync(request, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finish an UAM/application.
|
||||||
|
*
|
||||||
|
* @param uamId identifier for the UAM
|
||||||
|
* @param request FinishApplicationMasterRequest
|
||||||
|
* @return FinishApplicationMasterResponse
|
||||||
|
* @throws YarnException if finishApplicationMaster call fails
|
||||||
|
* @throws IOException if finishApplicationMaster call fails
|
||||||
|
*/
|
||||||
|
public FinishApplicationMasterResponse finishApplicationMaster(String uamId,
|
||||||
|
FinishApplicationMasterRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
|
||||||
|
throw new YarnException("UAM " + uamId + " does not exist");
|
||||||
|
}
|
||||||
|
LOG.info("Finishing application for UAM id {} ", uamId);
|
||||||
|
FinishApplicationMasterResponse response =
|
||||||
|
this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request);
|
||||||
|
|
||||||
|
if (response.getIsUnregistered()) {
|
||||||
|
// Only remove the UAM when the unregister finished
|
||||||
|
this.unmanagedAppMasterMap.remove(uamId);
|
||||||
|
this.attemptIdMap.remove(uamId);
|
||||||
|
LOG.info("UAM id {} is unregistered", uamId);
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the id of all running UAMs.
|
||||||
|
*
|
||||||
|
* @return uamId set
|
||||||
|
*/
|
||||||
|
public Set<String> getAllUAMIds() {
|
||||||
|
// Return a clone of the current id set for concurrency reasons, so that the
|
||||||
|
// returned map won't change with the actual map
|
||||||
|
return new HashSet<String>(this.unmanagedAppMasterMap.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return whether an UAM exists.
|
||||||
|
*
|
||||||
|
* @param uamId identifier for the UAM
|
||||||
|
* @return UAM exists or not
|
||||||
|
*/
|
||||||
|
public boolean hasUAMId(String uamId) {
|
||||||
|
return this.unmanagedAppMasterMap.containsKey(uamId);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,607 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.uam;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* UnmanagedApplicationManager is used to register unmanaged application and
|
||||||
|
* negotiate for resources from resource managers. An unmanagedAM is an AM that
|
||||||
|
* is not launched and managed by the RM. Allocate calls are handled
|
||||||
|
* asynchronously using {@link AsyncCallback}.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class UnmanagedApplicationManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(UnmanagedApplicationManager.class);
|
||||||
|
private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
|
||||||
|
private static final String APP_NAME = "UnmanagedAM";
|
||||||
|
private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
|
||||||
|
|
||||||
|
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
|
||||||
|
private AMRequestHandlerThread handlerThread;
|
||||||
|
private ApplicationMasterProtocol rmProxy;
|
||||||
|
private ApplicationId applicationId;
|
||||||
|
private ApplicationAttemptId attemptId;
|
||||||
|
private String submitter;
|
||||||
|
private String appNameSuffix;
|
||||||
|
private Configuration conf;
|
||||||
|
private String queueName;
|
||||||
|
private UserGroupInformation userUgi;
|
||||||
|
private RegisterApplicationMasterRequest registerRequest;
|
||||||
|
private int lastResponseId;
|
||||||
|
private ApplicationClientProtocol rmClient;
|
||||||
|
private long asyncApiPollIntervalMillis;
|
||||||
|
private RecordFactory recordFactory;
|
||||||
|
|
||||||
|
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
|
||||||
|
String queueName, String submitter, String appNameSuffix) {
|
||||||
|
Preconditions.checkNotNull(conf, "Configuration cannot be null");
|
||||||
|
Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
|
||||||
|
Preconditions.checkNotNull(submitter, "App submitter cannot be null");
|
||||||
|
|
||||||
|
this.conf = conf;
|
||||||
|
this.applicationId = appId;
|
||||||
|
this.queueName = queueName;
|
||||||
|
this.submitter = submitter;
|
||||||
|
this.appNameSuffix = appNameSuffix;
|
||||||
|
this.handlerThread = new AMRequestHandlerThread();
|
||||||
|
this.requestQueue = new LinkedBlockingQueue<>();
|
||||||
|
this.rmProxy = null;
|
||||||
|
this.registerRequest = null;
|
||||||
|
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
||||||
|
this.asyncApiPollIntervalMillis = conf.getLong(
|
||||||
|
YarnConfiguration.
|
||||||
|
YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
|
||||||
|
YarnConfiguration.
|
||||||
|
DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers this {@link UnmanagedApplicationManager} with the resource
|
||||||
|
* manager.
|
||||||
|
*
|
||||||
|
* @param request the register request
|
||||||
|
* @return the register response
|
||||||
|
* @throws YarnException if register fails
|
||||||
|
* @throws IOException if register fails
|
||||||
|
*/
|
||||||
|
public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
// This need to be done first in this method, because it is used as an
|
||||||
|
// indication that this method is called (and perhaps blocked due to RM
|
||||||
|
// connection and not finished yet)
|
||||||
|
this.registerRequest = request;
|
||||||
|
|
||||||
|
// attemptId will be available after this call
|
||||||
|
UnmanagedAMIdentifier identifier =
|
||||||
|
initializeUnmanagedAM(this.applicationId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.userUgi = UserGroupInformation.createProxyUser(
|
||||||
|
identifier.getAttemptId().toString(),
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Exception while trying to get current user", e);
|
||||||
|
throw new YarnRuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
|
||||||
|
this.userUgi, identifier.getToken());
|
||||||
|
|
||||||
|
LOG.info("Registering the Unmanaged application master {}", this.attemptId);
|
||||||
|
RegisterApplicationMasterResponse response =
|
||||||
|
this.rmProxy.registerApplicationMaster(this.registerRequest);
|
||||||
|
|
||||||
|
// Only when register succeed that we start the heartbeat thread
|
||||||
|
this.handlerThread.setUncaughtExceptionHandler(
|
||||||
|
new HeartBeatThreadUncaughtExceptionHandler());
|
||||||
|
this.handlerThread.setDaemon(true);
|
||||||
|
this.handlerThread.start();
|
||||||
|
|
||||||
|
this.lastResponseId = 0;
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unregisters from the resource manager and stops the request handler thread.
|
||||||
|
*
|
||||||
|
* @param request the finishApplicationMaster request
|
||||||
|
* @return the response
|
||||||
|
* @throws YarnException if finishAM call fails
|
||||||
|
* @throws IOException if finishAM call fails
|
||||||
|
*/
|
||||||
|
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||||
|
FinishApplicationMasterRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
|
||||||
|
this.handlerThread.shutdown();
|
||||||
|
|
||||||
|
if (this.rmProxy == null) {
|
||||||
|
if (this.registerRequest != null) {
|
||||||
|
// This is possible if the async registerApplicationMaster is still
|
||||||
|
// blocked and retrying. Return a dummy response in this case.
|
||||||
|
LOG.warn("Unmanaged AM still not successfully launched/registered yet."
|
||||||
|
+ " Stopping the UAM client thread anyways.");
|
||||||
|
return FinishApplicationMasterResponse.newInstance(false);
|
||||||
|
} else {
|
||||||
|
throw new YarnException("finishApplicationMaster should not "
|
||||||
|
+ "be called before createAndRegister");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
|
||||||
|
this.registerRequest, this.attemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force kill the UAM.
|
||||||
|
*
|
||||||
|
* @return kill response
|
||||||
|
* @throws IOException if fails to create rmProxy
|
||||||
|
* @throws YarnException if force kill fails
|
||||||
|
*/
|
||||||
|
public KillApplicationResponse forceKillApplication()
|
||||||
|
throws IOException, YarnException {
|
||||||
|
KillApplicationRequest request =
|
||||||
|
KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
|
||||||
|
|
||||||
|
this.handlerThread.shutdown();
|
||||||
|
|
||||||
|
if (this.rmClient == null) {
|
||||||
|
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
|
||||||
|
UserGroupInformation.createRemoteUser(this.submitter), null);
|
||||||
|
}
|
||||||
|
return this.rmClient.forceKillApplication(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends the specified heart beat request to the resource manager and invokes
|
||||||
|
* the callback asynchronously with the response.
|
||||||
|
*
|
||||||
|
* @param request the allocate request
|
||||||
|
* @param callback the callback method for the request
|
||||||
|
* @throws YarnException if registerAM is not called yet
|
||||||
|
*/
|
||||||
|
public void allocateAsync(AllocateRequest request,
|
||||||
|
AsyncCallback<AllocateResponse> callback) throws YarnException {
|
||||||
|
try {
|
||||||
|
this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// Should not happen as we have MAX_INT queue length
|
||||||
|
LOG.debug("Interrupted while waiting to put on response queue", ex);
|
||||||
|
}
|
||||||
|
// Two possible cases why the UAM is not successfully registered yet:
|
||||||
|
// 1. registerApplicationMaster is not called at all. Should throw here.
|
||||||
|
// 2. registerApplicationMaster is called but hasn't successfully returned.
|
||||||
|
//
|
||||||
|
// In case 2, we have already save the allocate request above, so if the
|
||||||
|
// registration succeed later, no request is lost.
|
||||||
|
if (this.rmProxy == null) {
|
||||||
|
if (this.registerRequest != null) {
|
||||||
|
LOG.info("Unmanaged AM still not successfully launched/registered yet."
|
||||||
|
+ " Saving the allocate request and send later.");
|
||||||
|
} else {
|
||||||
|
throw new YarnException(
|
||||||
|
"AllocateAsync should not be called before createAndRegister");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the application attempt id of the UAM.
|
||||||
|
*
|
||||||
|
* @return attempt id of the UAM
|
||||||
|
*/
|
||||||
|
public ApplicationAttemptId getAttemptId() {
|
||||||
|
return this.attemptId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns RM proxy for the specified protocol type. Unit test cases can
|
||||||
|
* override this method and return mock proxy instances.
|
||||||
|
*
|
||||||
|
* @param protocol protocal of the proxy
|
||||||
|
* @param config configuration
|
||||||
|
* @param user ugi for the proxy connection
|
||||||
|
* @param token token for the connection
|
||||||
|
* @param <T> type of the proxy
|
||||||
|
* @return the proxy instance
|
||||||
|
* @throws IOException if fails to create the proxy
|
||||||
|
*/
|
||||||
|
protected <T> T createRMProxy(Class<T> protocol, Configuration config,
|
||||||
|
UserGroupInformation user, Token<AMRMTokenIdentifier> token)
|
||||||
|
throws IOException {
|
||||||
|
return AMRMClientUtils.createRMProxy(config, protocol, user, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Launch and initialize an unmanaged AM. First, it creates a new application
|
||||||
|
* on the RM and negotiates a new attempt id. Then it waits for the RM
|
||||||
|
* application attempt state to reach YarnApplicationAttemptState.LAUNCHED
|
||||||
|
* after which it returns the AM-RM token and the attemptId.
|
||||||
|
*
|
||||||
|
* @param appId application id
|
||||||
|
* @return the UAM identifier
|
||||||
|
* @throws IOException if initialize fails
|
||||||
|
* @throws YarnException if initialize fails
|
||||||
|
*/
|
||||||
|
protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
|
||||||
|
throws IOException, YarnException {
|
||||||
|
try {
|
||||||
|
UserGroupInformation appSubmitter =
|
||||||
|
UserGroupInformation.createRemoteUser(this.submitter);
|
||||||
|
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
|
||||||
|
appSubmitter, null);
|
||||||
|
|
||||||
|
// Submit the application
|
||||||
|
submitUnmanagedApp(appId);
|
||||||
|
|
||||||
|
// Monitor the application attempt to wait for launch state
|
||||||
|
ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId,
|
||||||
|
EnumSet.of(YarnApplicationState.ACCEPTED,
|
||||||
|
YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
|
||||||
|
YarnApplicationState.FAILED, YarnApplicationState.FINISHED),
|
||||||
|
YarnApplicationAttemptState.LAUNCHED);
|
||||||
|
this.attemptId = attemptReport.getApplicationAttemptId();
|
||||||
|
return getUAMIdentifier();
|
||||||
|
} finally {
|
||||||
|
this.rmClient = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void submitUnmanagedApp(ApplicationId appId)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
SubmitApplicationRequest submitRequest =
|
||||||
|
this.recordFactory.newRecordInstance(SubmitApplicationRequest.class);
|
||||||
|
|
||||||
|
ApplicationSubmissionContext context = this.recordFactory
|
||||||
|
.newRecordInstance(ApplicationSubmissionContext.class);
|
||||||
|
|
||||||
|
context.setApplicationId(appId);
|
||||||
|
context.setApplicationName(APP_NAME + "-" + appNameSuffix);
|
||||||
|
if (StringUtils.isBlank(this.queueName)) {
|
||||||
|
context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG,
|
||||||
|
YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||||
|
} else {
|
||||||
|
context.setQueue(this.queueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
ContainerLaunchContext amContainer =
|
||||||
|
this.recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||||
|
context.setResource(resource);
|
||||||
|
context.setAMContainerSpec(amContainer);
|
||||||
|
submitRequest.setApplicationSubmissionContext(context);
|
||||||
|
|
||||||
|
context.setUnmanagedAM(true);
|
||||||
|
|
||||||
|
LOG.info("Submitting unmanaged application {}", appId);
|
||||||
|
this.rmClient.submitApplication(submitRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Monitor the submitted application and attempt until it reaches certain
|
||||||
|
* states.
|
||||||
|
*
|
||||||
|
* @param appId Application Id of application to be monitored
|
||||||
|
* @param appStates acceptable application state
|
||||||
|
* @param attemptState acceptable application attempt state
|
||||||
|
* @return the application report
|
||||||
|
* @throws YarnException if getApplicationReport fails
|
||||||
|
* @throws IOException if getApplicationReport fails
|
||||||
|
*/
|
||||||
|
private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId,
|
||||||
|
Set<YarnApplicationState> appStates,
|
||||||
|
YarnApplicationAttemptState attemptState)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
ApplicationAttemptId appAttemptId = null;
|
||||||
|
while (true) {
|
||||||
|
if (appAttemptId == null) {
|
||||||
|
// Get application report for the appId we are interested in
|
||||||
|
ApplicationReport report = getApplicationReport(appId);
|
||||||
|
YarnApplicationState state = report.getYarnApplicationState();
|
||||||
|
if (appStates.contains(state)) {
|
||||||
|
if (state != YarnApplicationState.ACCEPTED) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"Received non-accepted application state: " + state
|
||||||
|
+ ". Application " + appId + " not the first attempt?");
|
||||||
|
}
|
||||||
|
appAttemptId =
|
||||||
|
getApplicationReport(appId).getCurrentApplicationAttemptId();
|
||||||
|
} else {
|
||||||
|
LOG.info("Current application state of {} is {}, will retry later.",
|
||||||
|
appId, state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (appAttemptId != null) {
|
||||||
|
GetApplicationAttemptReportRequest req = this.recordFactory
|
||||||
|
.newRecordInstance(GetApplicationAttemptReportRequest.class);
|
||||||
|
req.setApplicationAttemptId(appAttemptId);
|
||||||
|
ApplicationAttemptReport attemptReport = this.rmClient
|
||||||
|
.getApplicationAttemptReport(req).getApplicationAttemptReport();
|
||||||
|
if (attemptState
|
||||||
|
.equals(attemptReport.getYarnApplicationAttemptState())) {
|
||||||
|
return attemptReport;
|
||||||
|
}
|
||||||
|
LOG.info("Current attempt state of " + appAttemptId + " is "
|
||||||
|
+ attemptReport.getYarnApplicationAttemptState()
|
||||||
|
+ ", waiting for current attempt to reach " + attemptState);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(this.asyncApiPollIntervalMillis);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("Interrupted while waiting for current attempt of " + appId
|
||||||
|
+ " to reach " + attemptState);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) {
|
||||||
|
throw new RuntimeException("Timeout for waiting current attempt of "
|
||||||
|
+ appId + " to reach " + attemptState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the identifier of the unmanaged AM.
|
||||||
|
*
|
||||||
|
* @return the identifier of the unmanaged AM.
|
||||||
|
* @throws IOException if getApplicationReport fails
|
||||||
|
* @throws YarnException if getApplicationReport fails
|
||||||
|
*/
|
||||||
|
protected UnmanagedAMIdentifier getUAMIdentifier()
|
||||||
|
throws IOException, YarnException {
|
||||||
|
Token<AMRMTokenIdentifier> token = null;
|
||||||
|
org.apache.hadoop.yarn.api.records.Token amrmToken =
|
||||||
|
getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
|
||||||
|
if (amrmToken != null) {
|
||||||
|
token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
|
||||||
|
} else {
|
||||||
|
LOG.warn(
|
||||||
|
"AMRMToken not found in the application report for application: {}",
|
||||||
|
this.attemptId.getApplicationId());
|
||||||
|
}
|
||||||
|
return new UnmanagedAMIdentifier(this.attemptId, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationReport getApplicationReport(ApplicationId appId)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
GetApplicationReportRequest request =
|
||||||
|
this.recordFactory.newRecordInstance(GetApplicationReportRequest.class);
|
||||||
|
request.setApplicationId(appId);
|
||||||
|
return this.rmClient.getApplicationReport(request).getApplicationReport();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data structure that encapsulates the application attempt identifier and the
|
||||||
|
* AMRMTokenIdentifier. Make it public because clients with HA need it.
|
||||||
|
*/
|
||||||
|
public static class UnmanagedAMIdentifier {
|
||||||
|
private ApplicationAttemptId attemptId;
|
||||||
|
private Token<AMRMTokenIdentifier> token;
|
||||||
|
|
||||||
|
public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
|
||||||
|
Token<AMRMTokenIdentifier> token) {
|
||||||
|
this.attemptId = attemptId;
|
||||||
|
this.token = token;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationAttemptId getAttemptId() {
|
||||||
|
return this.attemptId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Token<AMRMTokenIdentifier> getToken() {
|
||||||
|
return this.token;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data structure that encapsulates AllocateRequest and AsyncCallback
|
||||||
|
* instance.
|
||||||
|
*/
|
||||||
|
public static class AsyncAllocateRequestInfo {
|
||||||
|
private AllocateRequest request;
|
||||||
|
private AsyncCallback<AllocateResponse> callback;
|
||||||
|
|
||||||
|
public AsyncAllocateRequestInfo(AllocateRequest request,
|
||||||
|
AsyncCallback<AllocateResponse> callback) {
|
||||||
|
Preconditions.checkArgument(request != null,
|
||||||
|
"AllocateRequest cannot be null");
|
||||||
|
Preconditions.checkArgument(callback != null, "Callback cannot be null");
|
||||||
|
|
||||||
|
this.request = request;
|
||||||
|
this.callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AsyncCallback<AllocateResponse> getCallback() {
|
||||||
|
return this.callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AllocateRequest getRequest() {
|
||||||
|
return this.request;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getRequestQueueSize() {
|
||||||
|
return this.requestQueue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extends Thread and provides an implementation that is used for processing
|
||||||
|
* the AM heart beat request asynchronously and sending back the response
|
||||||
|
* using the callback method registered with the system.
|
||||||
|
*/
|
||||||
|
public class AMRequestHandlerThread extends Thread {
|
||||||
|
|
||||||
|
// Indication flag for the thread to keep running
|
||||||
|
private volatile boolean keepRunning;
|
||||||
|
|
||||||
|
public AMRequestHandlerThread() {
|
||||||
|
super("UnmanagedApplicationManager Heartbeat Handler Thread");
|
||||||
|
this.keepRunning = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the thread.
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
this.keepRunning = false;
|
||||||
|
this.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (keepRunning) {
|
||||||
|
AsyncAllocateRequestInfo requestInfo;
|
||||||
|
try {
|
||||||
|
requestInfo = requestQueue.take();
|
||||||
|
if (requestInfo == null) {
|
||||||
|
throw new YarnException(
|
||||||
|
"Null requestInfo taken from request queue");
|
||||||
|
}
|
||||||
|
if (!keepRunning) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// change the response id before forwarding the allocate request as we
|
||||||
|
// could have different values for each UAM
|
||||||
|
AllocateRequest request = requestInfo.getRequest();
|
||||||
|
if (request == null) {
|
||||||
|
throw new YarnException("Null allocateRequest from requestInfo");
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
|
||||||
|
+ ((request.getAskList() == null) ? " empty"
|
||||||
|
: request.getAskList().size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
request.setResponseId(lastResponseId);
|
||||||
|
AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
|
||||||
|
request, rmProxy, registerRequest, attemptId);
|
||||||
|
if (response == null) {
|
||||||
|
throw new YarnException("Null allocateResponse from allocate");
|
||||||
|
}
|
||||||
|
|
||||||
|
lastResponseId = response.getResponseId();
|
||||||
|
// update token if RM has reissued/renewed
|
||||||
|
if (response.getAMRMToken() != null) {
|
||||||
|
LOG.debug("Received new AMRMToken");
|
||||||
|
YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
|
||||||
|
userUgi, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
|
||||||
|
+ ((response.getAllocatedContainers() == null) ? " empty"
|
||||||
|
: response.getAllocatedContainers().size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (requestInfo.getCallback() == null) {
|
||||||
|
throw new YarnException("Null callback from requestInfo");
|
||||||
|
}
|
||||||
|
requestInfo.getCallback().callback(response);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Interrupted while waiting for queue", ex);
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.warn(
|
||||||
|
"IO Error occurred while processing heart beat for " + attemptId,
|
||||||
|
ex);
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.warn(
|
||||||
|
"Error occurred while processing heart beat for " + attemptId,
|
||||||
|
ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("UnmanagedApplicationManager has been stopped for {}. "
|
||||||
|
+ "AMRequestHandlerThread thread is exiting", attemptId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uncaught exception handler for the background heartbeat thread.
|
||||||
|
*/
|
||||||
|
protected class HeartBeatThreadUncaughtExceptionHandler
|
||||||
|
implements UncaughtExceptionHandler {
|
||||||
|
@Override
|
||||||
|
public void uncaughtException(Thread t, Throwable e) {
|
||||||
|
LOG.error("Heartbeat thread {} for application attempt {} crashed!",
|
||||||
|
t.getName(), attemptId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.uam;
|
|
@ -0,0 +1,189 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.utils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.security.SaslRpcServer;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class for AMRMClient.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public final class AMRMClientUtils {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(AMRMClientUtils.class);
|
||||||
|
|
||||||
|
public static final String APP_ALREADY_REGISTERED_MESSAGE =
|
||||||
|
"Application Master is already registered : ";
|
||||||
|
|
||||||
|
private AMRMClientUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle ApplicationNotRegistered exception and re-register.
|
||||||
|
*
|
||||||
|
* @param attemptId app attemptId
|
||||||
|
* @param rmProxy RM proxy instance
|
||||||
|
* @param registerRequest the AM re-register request
|
||||||
|
* @throws YarnException if re-register fails
|
||||||
|
*/
|
||||||
|
public static void handleNotRegisteredExceptionAndReRegister(
|
||||||
|
ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
|
||||||
|
RegisterApplicationMasterRequest registerRequest) throws YarnException {
|
||||||
|
LOG.info("App attempt {} not registered, most likely due to RM failover. "
|
||||||
|
+ " Trying to re-register.", attemptId);
|
||||||
|
try {
|
||||||
|
rmProxy.registerApplicationMaster(registerRequest);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e instanceof InvalidApplicationMasterRequestException
|
||||||
|
&& e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
|
||||||
|
LOG.info("Concurrent thread successfully registered, moving on.");
|
||||||
|
} else {
|
||||||
|
LOG.error("Error trying to re-register AM", e);
|
||||||
|
throw new YarnException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method for client calling ApplicationMasterProtocol.allocate that
|
||||||
|
* handles re-register if RM fails over.
|
||||||
|
*
|
||||||
|
* @param request allocate request
|
||||||
|
* @param rmProxy RM proxy
|
||||||
|
* @param registerRequest the register request for re-register
|
||||||
|
* @param attemptId application attempt id
|
||||||
|
* @return allocate response
|
||||||
|
* @throws YarnException if RM call fails
|
||||||
|
* @throws IOException if RM call fails
|
||||||
|
*/
|
||||||
|
public static AllocateResponse allocateWithReRegister(AllocateRequest request,
|
||||||
|
ApplicationMasterProtocol rmProxy,
|
||||||
|
RegisterApplicationMasterRequest registerRequest,
|
||||||
|
ApplicationAttemptId attemptId) throws YarnException, IOException {
|
||||||
|
try {
|
||||||
|
return rmProxy.allocate(request);
|
||||||
|
} catch (ApplicationMasterNotRegisteredException e) {
|
||||||
|
handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
|
||||||
|
registerRequest);
|
||||||
|
// reset responseId after re-register
|
||||||
|
request.setResponseId(0);
|
||||||
|
// retry allocate
|
||||||
|
return allocateWithReRegister(request, rmProxy, registerRequest,
|
||||||
|
attemptId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method for client calling
|
||||||
|
* ApplicationMasterProtocol.finishApplicationMaster that handles re-register
|
||||||
|
* if RM fails over.
|
||||||
|
*
|
||||||
|
* @param request finishApplicationMaster request
|
||||||
|
* @param rmProxy RM proxy
|
||||||
|
* @param registerRequest the register request for re-register
|
||||||
|
* @param attemptId application attempt id
|
||||||
|
* @return finishApplicationMaster response
|
||||||
|
* @throws YarnException if RM call fails
|
||||||
|
* @throws IOException if RM call fails
|
||||||
|
*/
|
||||||
|
public static FinishApplicationMasterResponse finishAMWithReRegister(
|
||||||
|
FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
|
||||||
|
RegisterApplicationMasterRequest registerRequest,
|
||||||
|
ApplicationAttemptId attemptId) throws YarnException, IOException {
|
||||||
|
try {
|
||||||
|
return rmProxy.finishApplicationMaster(request);
|
||||||
|
} catch (ApplicationMasterNotRegisteredException ex) {
|
||||||
|
handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
|
||||||
|
registerRequest);
|
||||||
|
// retry finishAM after re-register
|
||||||
|
return finishAMWithReRegister(request, rmProxy, registerRequest,
|
||||||
|
attemptId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a proxy for the specified protocol.
|
||||||
|
*
|
||||||
|
* @param configuration Configuration to generate {@link ClientRMProxy}
|
||||||
|
* @param protocol Protocol for the proxy
|
||||||
|
* @param user the user on whose behalf the proxy is being created
|
||||||
|
* @param token the auth token to use for connection
|
||||||
|
* @param <T> Type information of the proxy
|
||||||
|
* @return Proxy to the RM
|
||||||
|
* @throws IOException on failure
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public static <T> T createRMProxy(final Configuration configuration,
|
||||||
|
final Class<T> protocol, UserGroupInformation user,
|
||||||
|
final Token<? extends TokenIdentifier> token) throws IOException {
|
||||||
|
try {
|
||||||
|
String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID,
|
||||||
|
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
|
||||||
|
LOG.info("Creating RMProxy to RM {} for protocol {} for user {}",
|
||||||
|
rmClusterId, protocol.getSimpleName(), user);
|
||||||
|
if (token != null) {
|
||||||
|
token.setService(ClientRMProxy.getAMRMTokenService(configuration));
|
||||||
|
user.addToken(token);
|
||||||
|
setAuthModeInConf(configuration);
|
||||||
|
}
|
||||||
|
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
|
||||||
|
@Override
|
||||||
|
public T run() throws Exception {
|
||||||
|
return ClientRMProxy.createRMProxy(configuration, protocol);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return proxyConnection;
|
||||||
|
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new YarnRuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setAuthModeInConf(Configuration conf) {
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
SaslRpcServer.AuthMethod.TOKEN.toString());
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,13 +23,16 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
@ -42,8 +45,8 @@ import org.slf4j.LoggerFactory;
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
public final class YarnServerSecurityUtils {
|
public final class YarnServerSecurityUtils {
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG =
|
||||||
.getLogger(YarnServerSecurityUtils.class);
|
LoggerFactory.getLogger(YarnServerSecurityUtils.class);
|
||||||
|
|
||||||
private YarnServerSecurityUtils() {
|
private YarnServerSecurityUtils() {
|
||||||
}
|
}
|
||||||
|
@ -55,8 +58,7 @@ public final class YarnServerSecurityUtils {
|
||||||
* @return the AMRMTokenIdentifier instance for the current user
|
* @return the AMRMTokenIdentifier instance for the current user
|
||||||
* @throws YarnException
|
* @throws YarnException
|
||||||
*/
|
*/
|
||||||
public static AMRMTokenIdentifier authorizeRequest()
|
public static AMRMTokenIdentifier authorizeRequest() throws YarnException {
|
||||||
throws YarnException {
|
|
||||||
|
|
||||||
UserGroupInformation remoteUgi;
|
UserGroupInformation remoteUgi;
|
||||||
try {
|
try {
|
||||||
|
@ -82,9 +84,8 @@ public final class YarnServerSecurityUtils {
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
tokenFound = false;
|
tokenFound = false;
|
||||||
message =
|
message = "Got exception while looking for AMRMToken for user "
|
||||||
"Got exception while looking for AMRMToken for user "
|
+ remoteUgi.getUserName();
|
||||||
+ remoteUgi.getUserName();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tokenFound) {
|
if (!tokenFound) {
|
||||||
|
@ -112,9 +113,30 @@ public final class YarnServerSecurityUtils {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the new AMRMToken into the ugi used for RM proxy.
|
||||||
|
*
|
||||||
|
* @param token the new AMRMToken sent by RM
|
||||||
|
* @param user ugi used for RM proxy
|
||||||
|
* @param conf configuration
|
||||||
|
*/
|
||||||
|
public static void updateAMRMToken(
|
||||||
|
org.apache.hadoop.yarn.api.records.Token token, UserGroupInformation user,
|
||||||
|
Configuration conf) {
|
||||||
|
Token<AMRMTokenIdentifier> amrmToken = new Token<AMRMTokenIdentifier>(
|
||||||
|
token.getIdentifier().array(), token.getPassword().array(),
|
||||||
|
new Text(token.getKind()), new Text(token.getService()));
|
||||||
|
// Preserve the token service sent by the RM when adding the token
|
||||||
|
// to ensure we replace the previous token setup by the RM.
|
||||||
|
// Afterwards we can update the service address for the RPC layer.
|
||||||
|
user.addToken(amrmToken);
|
||||||
|
amrmToken.setService(ClientRMProxy.getAMRMTokenService(conf));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parses the container launch context and returns a Credential instance that
|
* Parses the container launch context and returns a Credential instance that
|
||||||
* contains all the tokens from the launch context.
|
* contains all the tokens from the launch context.
|
||||||
|
*
|
||||||
* @param launchContext
|
* @param launchContext
|
||||||
* @return the credential instance
|
* @return the credential instance
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -130,8 +152,7 @@ public final class YarnServerSecurityUtils {
|
||||||
buf.reset(tokens);
|
buf.reset(tokens);
|
||||||
credentials.readTokenStorageStream(buf);
|
credentials.readTokenStorageStream(buf);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
for (Token<? extends TokenIdentifier> tk : credentials
|
for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
|
||||||
.getAllTokens()) {
|
|
||||||
LOG.debug(tk.getService() + " = " + tk.toString());
|
LOG.debug(tk.getService() + " = " + tk.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -116,6 +117,8 @@ import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||||
|
@ -145,11 +148,11 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -171,12 +174,25 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
||||||
private AtomicInteger containerIndex = new AtomicInteger(0);
|
private AtomicInteger containerIndex = new AtomicInteger(0);
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
|
private boolean shouldReRegisterNext = false;
|
||||||
|
|
||||||
|
// For unit test synchronization
|
||||||
|
private static Object syncObj = new Object();
|
||||||
|
|
||||||
|
public static Object getSyncObj() {
|
||||||
|
return syncObj;
|
||||||
|
}
|
||||||
|
|
||||||
public MockResourceManagerFacade(Configuration conf,
|
public MockResourceManagerFacade(Configuration conf,
|
||||||
int startContainerIndex) {
|
int startContainerIndex) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.containerIndex.set(startContainerIndex);
|
this.containerIndex.set(startContainerIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setShouldReRegisterNext() {
|
||||||
|
shouldReRegisterNext = true;
|
||||||
|
}
|
||||||
|
|
||||||
private static String getAppIdentifier() throws IOException {
|
private static String getAppIdentifier() throws IOException {
|
||||||
AMRMTokenIdentifier result = null;
|
AMRMTokenIdentifier result = null;
|
||||||
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
|
||||||
|
@ -197,14 +213,31 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
||||||
String amrmToken = getAppIdentifier();
|
String amrmToken = getAppIdentifier();
|
||||||
LOG.info("Registering application attempt: " + amrmToken);
|
LOG.info("Registering application attempt: " + amrmToken);
|
||||||
|
|
||||||
|
shouldReRegisterNext = false;
|
||||||
|
|
||||||
|
synchronized (syncObj) {
|
||||||
|
syncObj.notifyAll();
|
||||||
|
// We reuse the port number to indicate whether the unit test want us to
|
||||||
|
// wait here
|
||||||
|
if (request.getRpcPort() > 1000) {
|
||||||
|
LOG.info("Register call in RM start waiting");
|
||||||
|
try {
|
||||||
|
syncObj.wait();
|
||||||
|
LOG.info("Register call in RM wait finished");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info("Register call in RM wait interrupted", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (applicationContainerIdMap) {
|
synchronized (applicationContainerIdMap) {
|
||||||
Assert.assertFalse(
|
if (applicationContainerIdMap.containsKey(amrmToken)) {
|
||||||
"The application id is already registered: " + amrmToken,
|
throw new InvalidApplicationMasterRequestException(
|
||||||
applicationContainerIdMap.containsKey(amrmToken));
|
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
|
||||||
|
}
|
||||||
// Keep track of the containers that are returned to this application
|
// Keep track of the containers that are returned to this application
|
||||||
applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
|
applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
|
||||||
}
|
}
|
||||||
|
|
||||||
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
|
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
|
||||||
null, request.getHost(), null);
|
null, request.getHost(), null);
|
||||||
}
|
}
|
||||||
|
@ -216,6 +249,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
||||||
String amrmToken = getAppIdentifier();
|
String amrmToken = getAppIdentifier();
|
||||||
LOG.info("Finishing application attempt: " + amrmToken);
|
LOG.info("Finishing application attempt: " + amrmToken);
|
||||||
|
|
||||||
|
if (shouldReRegisterNext) {
|
||||||
|
String message = "AM is not registered, should re-register.";
|
||||||
|
LOG.warn(message);
|
||||||
|
throw new ApplicationMasterNotRegisteredException(message);
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (applicationContainerIdMap) {
|
synchronized (applicationContainerIdMap) {
|
||||||
// Remove the containers that were being tracked for this application
|
// Remove the containers that were being tracked for this application
|
||||||
Assert.assertTrue("The application id is NOT registered: " + amrmToken,
|
Assert.assertTrue("The application id is NOT registered: " + amrmToken,
|
||||||
|
@ -251,6 +290,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
||||||
}
|
}
|
||||||
|
|
||||||
String amrmToken = getAppIdentifier();
|
String amrmToken = getAppIdentifier();
|
||||||
|
LOG.info("Allocate from application attempt: " + amrmToken);
|
||||||
|
|
||||||
|
if (shouldReRegisterNext) {
|
||||||
|
String message = "AM is not registered, should re-register.";
|
||||||
|
LOG.warn(message);
|
||||||
|
throw new ApplicationMasterNotRegisteredException(message);
|
||||||
|
}
|
||||||
|
|
||||||
ArrayList<Container> containerList = new ArrayList<Container>();
|
ArrayList<Container> containerList = new ArrayList<Container>();
|
||||||
if (request.getAskList() != null) {
|
if (request.getAskList() != null) {
|
||||||
|
@ -384,6 +430,33 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
||||||
@Override
|
@Override
|
||||||
public KillApplicationResponse forceKillApplication(
|
public KillApplicationResponse forceKillApplication(
|
||||||
KillApplicationRequest request) throws YarnException, IOException {
|
KillApplicationRequest request) throws YarnException, IOException {
|
||||||
|
String appId = "";
|
||||||
|
boolean foundApp = false;
|
||||||
|
if (request.getApplicationId() != null) {
|
||||||
|
appId = request.getApplicationId().toString();
|
||||||
|
synchronized (applicationContainerIdMap) {
|
||||||
|
for (Entry<String, List<ContainerId>> entry : applicationContainerIdMap
|
||||||
|
.entrySet()) {
|
||||||
|
ApplicationAttemptId attemptId =
|
||||||
|
ApplicationAttemptId.fromString(entry.getKey());
|
||||||
|
if (attemptId.getApplicationId().equals(request.getApplicationId())) {
|
||||||
|
// Remove the apptempt and the containers that were being tracked
|
||||||
|
List<ContainerId> ids =
|
||||||
|
applicationContainerIdMap.remove(entry.getKey());
|
||||||
|
if (ids != null) {
|
||||||
|
for (ContainerId c : ids) {
|
||||||
|
allocatedContainerMap.remove(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
foundApp = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!foundApp) {
|
||||||
|
throw new YarnException("The application id is NOT registered: " + appId);
|
||||||
|
}
|
||||||
|
LOG.info("Force killing application: " + appId);
|
||||||
return KillApplicationResponse.newInstance(true);
|
return KillApplicationResponse.newInstance(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,335 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.uam;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||||
|
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test for UnmanagedApplicationManager.
|
||||||
|
*/
|
||||||
|
public class TestUnmanagedApplicationManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestUnmanagedApplicationManager.class);
|
||||||
|
|
||||||
|
private TestableUnmanagedApplicationManager uam;
|
||||||
|
private Configuration conf = new YarnConfiguration();
|
||||||
|
private CountingCallback callback;
|
||||||
|
|
||||||
|
private ApplicationAttemptId attemptId;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId");
|
||||||
|
callback = new CountingCallback();
|
||||||
|
|
||||||
|
attemptId =
|
||||||
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
|
||||||
|
|
||||||
|
uam = new TestableUnmanagedApplicationManager(conf,
|
||||||
|
attemptId.getApplicationId(), null, "submitter", "appNameSuffix");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void waitForCallBackCountAndCheckZeroPending(
|
||||||
|
CountingCallback callBack, int expectCallBackCount) {
|
||||||
|
synchronized (callBack) {
|
||||||
|
while (callBack.callBackCount != expectCallBackCount) {
|
||||||
|
try {
|
||||||
|
callBack.wait();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Non zero pending requests when number of allocate callbacks reaches "
|
||||||
|
+ expectCallBackCount,
|
||||||
|
0, callBack.requestQueueSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testBasicUsage()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
|
||||||
|
createAndRegisterApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
|
||||||
|
|
||||||
|
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
|
||||||
|
attemptId);
|
||||||
|
|
||||||
|
// Wait for outstanding async allocate callback
|
||||||
|
waitForCallBackCountAndCheckZeroPending(callback, 1);
|
||||||
|
|
||||||
|
finishApplicationMaster(
|
||||||
|
FinishApplicationMasterRequest.newInstance(null, null, null),
|
||||||
|
attemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testReRegister()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
|
||||||
|
createAndRegisterApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
|
||||||
|
|
||||||
|
uam.setShouldReRegisterNext();
|
||||||
|
|
||||||
|
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
|
||||||
|
attemptId);
|
||||||
|
|
||||||
|
// Wait for outstanding async allocate callback
|
||||||
|
waitForCallBackCountAndCheckZeroPending(callback, 1);
|
||||||
|
|
||||||
|
uam.setShouldReRegisterNext();
|
||||||
|
|
||||||
|
finishApplicationMaster(
|
||||||
|
FinishApplicationMasterRequest.newInstance(null, null, null),
|
||||||
|
attemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If register is slow, async allocate requests in the meanwhile should not
|
||||||
|
* throw or be dropped.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testSlowRegisterCall()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
|
||||||
|
// Register with wait() in RM in a separate thread
|
||||||
|
Thread registerAMThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
createAndRegisterApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest.newInstance(null, 1001, null),
|
||||||
|
attemptId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Register thread exception", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Sync obj from mock RM
|
||||||
|
Object syncObj = MockResourceManagerFacade.getSyncObj();
|
||||||
|
|
||||||
|
// Wait for register call in the thread get into RM and then wake us
|
||||||
|
synchronized (syncObj) {
|
||||||
|
LOG.info("Starting register thread");
|
||||||
|
registerAMThread.start();
|
||||||
|
try {
|
||||||
|
LOG.info("Test main starts waiting");
|
||||||
|
syncObj.wait();
|
||||||
|
LOG.info("Test main wait finished");
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Test main wait interrupted", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// First allocate before register succeeds
|
||||||
|
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
|
||||||
|
attemptId);
|
||||||
|
|
||||||
|
// Notify the register thread
|
||||||
|
synchronized (syncObj) {
|
||||||
|
syncObj.notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Test main wait for register thread to finish");
|
||||||
|
registerAMThread.join();
|
||||||
|
LOG.info("Register thread finished");
|
||||||
|
|
||||||
|
// Second allocate, normal case
|
||||||
|
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
|
||||||
|
attemptId);
|
||||||
|
|
||||||
|
// Both allocate before should respond
|
||||||
|
waitForCallBackCountAndCheckZeroPending(callback, 2);
|
||||||
|
|
||||||
|
finishApplicationMaster(
|
||||||
|
FinishApplicationMasterRequest.newInstance(null, null, null),
|
||||||
|
attemptId);
|
||||||
|
|
||||||
|
// Allocates after finishAM should be ignored
|
||||||
|
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
|
||||||
|
attemptId);
|
||||||
|
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
|
||||||
|
attemptId);
|
||||||
|
|
||||||
|
Assert.assertEquals(0, callback.requestQueueSize);
|
||||||
|
|
||||||
|
// A short wait just in case the allocates get executed
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(2, callback.callBackCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testAllocateWithoutRegister()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
|
||||||
|
attemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testFinishWithoutRegister()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
finishApplicationMaster(
|
||||||
|
FinishApplicationMasterRequest.newInstance(null, null, null),
|
||||||
|
attemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testForceKill()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
createAndRegisterApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
|
||||||
|
uam.forceKillApplication();
|
||||||
|
|
||||||
|
try {
|
||||||
|
uam.forceKillApplication();
|
||||||
|
Assert.fail("Should fail because application is already killed");
|
||||||
|
} catch (YarnException t) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected UserGroupInformation getUGIWithToken(
|
||||||
|
ApplicationAttemptId appAttemptId) {
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
||||||
|
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
|
||||||
|
ugi.addTokenIdentifier(token);
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RegisterApplicationMasterResponse
|
||||||
|
createAndRegisterApplicationMaster(
|
||||||
|
final RegisterApplicationMasterRequest request,
|
||||||
|
ApplicationAttemptId appAttemptId)
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
return getUGIWithToken(appAttemptId).doAs(
|
||||||
|
new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
|
||||||
|
@Override
|
||||||
|
public RegisterApplicationMasterResponse run()
|
||||||
|
throws YarnException, IOException {
|
||||||
|
RegisterApplicationMasterResponse response =
|
||||||
|
uam.createAndRegisterApplicationMaster(request);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void allocateAsync(final AllocateRequest request,
|
||||||
|
final AsyncCallback<AllocateResponse> callBack,
|
||||||
|
ApplicationAttemptId appAttemptId)
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object run() throws YarnException {
|
||||||
|
uam.allocateAsync(request, callBack);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected FinishApplicationMasterResponse finishApplicationMaster(
|
||||||
|
final FinishApplicationMasterRequest request,
|
||||||
|
ApplicationAttemptId appAttemptId)
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
return getUGIWithToken(appAttemptId)
|
||||||
|
.doAs(new PrivilegedExceptionAction<FinishApplicationMasterResponse>() {
|
||||||
|
@Override
|
||||||
|
public FinishApplicationMasterResponse run()
|
||||||
|
throws YarnException, IOException {
|
||||||
|
FinishApplicationMasterResponse response =
|
||||||
|
uam.finishApplicationMaster(request);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class CountingCallback implements AsyncCallback<AllocateResponse> {
|
||||||
|
private int callBackCount;
|
||||||
|
private int requestQueueSize;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void callback(AllocateResponse response) {
|
||||||
|
synchronized (this) {
|
||||||
|
callBackCount++;
|
||||||
|
requestQueueSize = uam.getRequestQueueSize();
|
||||||
|
this.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testable UnmanagedApplicationManager that talks to a mock RM.
|
||||||
|
*/
|
||||||
|
public static class TestableUnmanagedApplicationManager
|
||||||
|
extends UnmanagedApplicationManager {
|
||||||
|
|
||||||
|
private MockResourceManagerFacade rmProxy;
|
||||||
|
|
||||||
|
public TestableUnmanagedApplicationManager(Configuration conf,
|
||||||
|
ApplicationId appId, String queueName, String submitter,
|
||||||
|
String appNameSuffix) {
|
||||||
|
super(conf, appId, queueName, submitter, appNameSuffix);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
protected <T> T createRMProxy(final Class<T> protocol, Configuration config,
|
||||||
|
UserGroupInformation user, Token<AMRMTokenIdentifier> token) {
|
||||||
|
if (rmProxy == null) {
|
||||||
|
rmProxy = new MockResourceManagerFacade(config, 0);
|
||||||
|
}
|
||||||
|
return (T) rmProxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setShouldReRegisterNext() {
|
||||||
|
if (rmProxy != null) {
|
||||||
|
rmProxy.setShouldReRegisterNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.ServerRMProxy;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -134,7 +134,8 @@ public final class DefaultRequestInterceptor extends
|
||||||
}
|
}
|
||||||
AllocateResponse allocateResponse = rmClient.allocate(request);
|
AllocateResponse allocateResponse = rmClient.allocate(request);
|
||||||
if (allocateResponse.getAMRMToken() != null) {
|
if (allocateResponse.getAMRMToken() != null) {
|
||||||
updateAMRMToken(allocateResponse.getAMRMToken());
|
YarnServerSecurityUtils.updateAMRMToken(allocateResponse.getAMRMToken(),
|
||||||
|
this.user, getConf());
|
||||||
}
|
}
|
||||||
|
|
||||||
return allocateResponse;
|
return allocateResponse;
|
||||||
|
@ -170,7 +171,9 @@ public final class DefaultRequestInterceptor extends
|
||||||
((DistributedSchedulingAMProtocol)rmClient)
|
((DistributedSchedulingAMProtocol)rmClient)
|
||||||
.allocateForDistributedScheduling(request);
|
.allocateForDistributedScheduling(request);
|
||||||
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
|
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
|
||||||
updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
|
YarnServerSecurityUtils.updateAMRMToken(
|
||||||
|
allocateResponse.getAllocateResponse().getAMRMToken(), this.user,
|
||||||
|
getConf());
|
||||||
}
|
}
|
||||||
return allocateResponse;
|
return allocateResponse;
|
||||||
} else {
|
} else {
|
||||||
|
@ -195,18 +198,6 @@ public final class DefaultRequestInterceptor extends
|
||||||
+ "Check if the interceptor pipeline configuration is correct");
|
+ "Check if the interceptor pipeline configuration is correct");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateAMRMToken(Token token) throws IOException {
|
|
||||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
|
|
||||||
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
|
|
||||||
token.getIdentifier().array(), token.getPassword().array(),
|
|
||||||
new Text(token.getKind()), new Text(token.getService()));
|
|
||||||
// Preserve the token service sent by the RM when adding the token
|
|
||||||
// to ensure we replace the previous token setup by the RM.
|
|
||||||
// Afterwards we can update the service address for the RPC layer.
|
|
||||||
user.addToken(amrmToken);
|
|
||||||
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void setRMClient(final ApplicationMasterProtocol rmClient) {
|
public void setRMClient(final ApplicationMasterProtocol rmClient) {
|
||||||
if (rmClient instanceof DistributedSchedulingAMProtocol) {
|
if (rmClient instanceof DistributedSchedulingAMProtocol) {
|
||||||
|
@ -257,18 +248,11 @@ public final class DefaultRequestInterceptor extends
|
||||||
for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation
|
for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation
|
||||||
.getCurrentUser().getTokens()) {
|
.getCurrentUser().getTokens()) {
|
||||||
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||||
token.setService(getAMRMTokenService(conf));
|
token.setService(ClientRMProxy.getAMRMTokenService(conf));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
public static Text getAMRMTokenService(Configuration conf) {
|
|
||||||
return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
|
||||||
}
|
|
||||||
|
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public static Text getTokenService(Configuration conf, String address,
|
public static Text getTokenService(Configuration conf, String address,
|
||||||
String defaultAddr, int defaultPort) {
|
String defaultAddr, int defaultPort) {
|
||||||
|
|
|
@ -211,15 +211,13 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
AllocateResponse lastResponse = lock.getAllocateResponse();
|
AllocateResponse lastResponse = lock.getAllocateResponse();
|
||||||
if (hasApplicationMasterRegistered(applicationAttemptId)) {
|
if (hasApplicationMasterRegistered(applicationAttemptId)) {
|
||||||
String message =
|
String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
|
||||||
"Application Master is already registered : "
|
|
||||||
+ appID;
|
|
||||||
LOG.warn(message);
|
LOG.warn(message);
|
||||||
RMAuditLogger.logFailure(
|
RMAuditLogger.logFailure(
|
||||||
this.rmContext.getRMApps()
|
this.rmContext.getRMApps()
|
||||||
.get(appID).getUser(),
|
.get(appID).getUser(),
|
||||||
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
|
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
|
||||||
appID, applicationAttemptId);
|
appID, applicationAttemptId);
|
||||||
throw new InvalidApplicationMasterRequestException(message);
|
throw new InvalidApplicationMasterRequestException(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
|
@ -338,9 +339,8 @@ public class TestApplicationMasterLauncher {
|
||||||
am.registerAppAttempt(false);
|
am.registerAppAttempt(false);
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertEquals("Application Master is already registered : "
|
Assert.assertEquals(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE
|
||||||
+ attempt.getAppAttemptId().getApplicationId(),
|
+ attempt.getAppAttemptId().getApplicationId(), e.getMessage());
|
||||||
e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simulate an AM that was disconnected and app attempt was removed
|
// Simulate an AM that was disconnected and app attempt was removed
|
||||||
|
|
Loading…
Reference in New Issue