YARN-6128. Add support for AMRMProxy HA. (Botong Huang via Subru).

This commit is contained in:
Subru Krishnan 2017-11-20 14:21:58 -08:00
parent ea8a121423
commit ed31091361
21 changed files with 1342 additions and 278 deletions

View File

@ -1948,6 +1948,9 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
"org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+ "DefaultRequestInterceptor";
public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX
+ "amrmproxy.ha.enable";
public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false;
/**
* Default platform-agnostic CLASSPATH for YARN applications. A
@ -2790,6 +2793,11 @@ public class YarnConfiguration extends Configuration {
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
public static final String FEDERATION_REGISTRY_BASE_KEY =
FEDERATION_PREFIX + "registry.base-dir";
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
@ -2947,6 +2955,11 @@ public class YarnConfiguration extends Configuration {
// Other Configs
////////////////////////////////
public static final String YARN_REGISTRY_CLASS =
YARN_PREFIX + "registry.class";
public static final String DEFAULT_YARN_REGISTRY_CLASS =
"org.apache.hadoop.registry.client.impl.FSRegistryOperationsService";
/**
* Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead.
* The interval of the yarn client's querying application state after

View File

@ -2815,7 +2815,20 @@
<value>300</value>
</property>
<property>
<description>The registry base directory for federation.</description>
<name>yarn.federation.registry.base-dir</name>
<value>yarnfederation/</value>
</property>
<!-- Other Configuration -->
<property>
<description>The registry implementation to use.</description>
<name>yarn.registry.class</name>
<value>org.apache.hadoop.registry.client.impl.FSRegistryOperationsService</value>
</property>
<property>
<description>The interval that the yarn client library uses to poll the
completion status of the asynchronous API of application client protocol.
@ -2976,6 +2989,14 @@
<value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
</property>
<property>
<description>
Whether AMRMProxy HA is enabled.
</description>
<name>yarn.nodemanager.amrmproxy.ha.enable</name>
<value>false</value>
</property>
<property>
<description>
Setting that controls whether distributed scheduling is enabled.

View File

@ -67,6 +67,11 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-registry</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

View File

@ -0,0 +1,338 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Helper class that handles reads and writes to Yarn Registry to support UAM HA
* and second attempt.
*/
public class FederationRegistryClient {
private static final Logger LOG =
LoggerFactory.getLogger(FederationRegistryClient.class);
private RegistryOperations registry;
private UserGroupInformation user;
// AppId -> SubClusterId -> UAM token
private Map<ApplicationId, Map<String, Token<AMRMTokenIdentifier>>>
appSubClusterTokenMap;
// Structure in registry: <registryBaseDir>/<AppId>/<SubClusterId> -> UAMToken
private String registryBaseDir;
public FederationRegistryClient(Configuration conf,
RegistryOperations registry, UserGroupInformation user) {
this.registry = registry;
this.user = user;
this.appSubClusterTokenMap = new ConcurrentHashMap<>();
this.registryBaseDir =
conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY,
YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY);
LOG.info("Using registry {} with base directory: {}",
this.registry.getClass().getName(), this.registryBaseDir);
}
/**
* Get the list of known applications in the registry.
*
* @return the list of known applications
*/
public List<String> getAllApplications() {
// Suppress the exception here because it is valid that the entry does not
// exist
List<String> applications = null;
try {
applications = listDirRegistry(this.registry, this.user,
getRegistryKey(null, null), false);
} catch (YarnException e) {
LOG.warn("Unexpected exception from listDirRegistry", e);
}
if (applications == null) {
// It is valid for listDirRegistry to return null
return new ArrayList<>();
}
return applications;
}
/**
* For testing, delete all application records in registry.
*/
@VisibleForTesting
public void cleanAllApplications() {
try {
removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
true, false);
} catch (YarnException e) {
LOG.warn("Unexpected exception from removeKeyRegistry", e);
}
}
/**
* Write/update the UAM token for an application and a sub-cluster.
*
* @param subClusterId sub-cluster id of the token
* @param token the UAM of the application
* @return whether the amrmToken is added or updated to a new value
*/
public boolean writeAMRMTokenForUAM(ApplicationId appId,
String subClusterId, Token<AMRMTokenIdentifier> token) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
if (subClusterTokenMap == null) {
subClusterTokenMap = new ConcurrentHashMap<>();
this.appSubClusterTokenMap.put(appId, subClusterTokenMap);
}
boolean update = !token.equals(subClusterTokenMap.get(subClusterId));
if (!update) {
LOG.debug("Same amrmToken received from {}, skip writing registry for {}",
subClusterId, appId);
return update;
}
LOG.info("Writing/Updating amrmToken for {} to registry for {}",
subClusterId, appId);
try {
// First, write the token entry
writeRegistry(this.registry, this.user,
getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true);
// Then update the subClusterTokenMap
subClusterTokenMap.put(subClusterId, token);
} catch (YarnException | IOException e) {
LOG.error(
"Failed writing AMRMToken to registry for subcluster " + subClusterId,
e);
}
return update;
}
/**
* Load the information of one application from registry.
*
* @param appId application id
* @return the sub-cluster to UAM token mapping
*/
public Map<String, Token<AMRMTokenIdentifier>>
loadStateFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>();
// Suppress the exception here because it is valid that the entry does not
// exist
List<String> subclusters = null;
try {
subclusters = listDirRegistry(this.registry, this.user,
getRegistryKey(appId, null), false);
} catch (YarnException e) {
LOG.warn("Unexpected exception from listDirRegistry", e);
}
if (subclusters == null) {
LOG.info("Application {} does not exist in registry", appId);
return retMap;
}
// Read the amrmToken for each sub-cluster with an existing UAM
for (String scId : subclusters) {
LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId);
String key = getRegistryKey(appId, scId);
try {
String tokenString = readRegistry(this.registry, this.user, key, true);
if (tokenString == null) {
throw new YarnException("Null string from readRegistry key " + key);
}
Token<AMRMTokenIdentifier> amrmToken = new Token<>();
amrmToken.decodeFromUrlString(tokenString);
// Clear the service field, as if RM just issued the token
amrmToken.setService(new Text());
retMap.put(scId, amrmToken);
} catch (Exception e) {
LOG.error("Failed reading registry key " + key
+ ", skipping subcluster " + scId, e);
}
}
// Override existing map if there
this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap));
return retMap;
}
/**
* Remove an application from registry.
*
* @param appId application id
*/
public void removeAppFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
LOG.info("Removing all registry entries for {}", appId);
if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
return;
}
// Lastly remove the application directory
String key = getRegistryKey(appId, null);
try {
removeKeyRegistry(this.registry, this.user, key, true, true);
subClusterTokenMap.clear();
} catch (YarnException e) {
LOG.error("Failed removing registry directory key " + key, e);
}
}
private String getRegistryKey(ApplicationId appId, String fileName) {
if (appId == null) {
return this.registryBaseDir;
}
if (fileName == null) {
return this.registryBaseDir + appId.toString();
}
return this.registryBaseDir + appId.toString() + "/" + fileName;
}
private String readRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean throwIfFails)
throws YarnException {
// Use the ugi loaded with app credentials to access registry
String result = ugi.doAs(new PrivilegedAction<String>() {
@Override
public String run() {
try {
ServiceRecord value = registryImpl.resolve(key);
if (value != null) {
return value.description;
}
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry resolve key " + key + " failed", e);
}
}
return null;
}
});
if (result == null && throwIfFails) {
throw new YarnException("Registry resolve key " + key + " failed");
}
return result;
}
private void removeKeyRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean recursive,
final boolean throwIfFails) throws YarnException {
// Use the ugi loaded with app credentials to access registry
boolean success = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
try {
registryImpl.delete(key, recursive);
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry remove key " + key + " failed", e);
}
}
return false;
}
});
if (!success && throwIfFails) {
throw new YarnException("Registry remove key " + key + " failed");
}
}
/**
* Write registry entry, override if exists.
*/
private void writeRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final String value,
final boolean throwIfFails) throws YarnException {
final ServiceRecord recordValue = new ServiceRecord();
recordValue.description = value;
// Use the ugi loaded with app credentials to access registry
boolean success = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
try {
registryImpl.bind(key, recordValue, BindFlags.OVERWRITE);
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry write key " + key + " failed", e);
}
}
return false;
}
});
if (!success && throwIfFails) {
throw new YarnException("Registry write key " + key + " failed");
}
}
/**
* List the sub directories in the given directory.
*/
private List<String> listDirRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean throwIfFails)
throws YarnException {
List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() {
@Override
public List<String> run() {
try {
return registryImpl.list(key);
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry list key " + key + " failed", e);
}
}
return null;
}
});
if (result == null && throwIfFails) {
throw new YarnException("Registry list key " + key + " failed");
}
return result;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -44,9 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger;
@ -67,7 +68,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
// Map from uamId to UAM instances
private Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap;
private Map<String, ApplicationAttemptId> attemptIdMap;
private Map<String, ApplicationId> appIdMap;
private ExecutorService threadpool;
@ -82,7 +83,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
this.threadpool = Executors.newCachedThreadPool();
}
this.unmanagedAppMasterMap = new ConcurrentHashMap<>();
this.attemptIdMap = new ConcurrentHashMap<>();
this.appIdMap = new ConcurrentHashMap<>();
super.serviceStart();
}
@ -114,7 +115,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
public KillApplicationResponse call() throws Exception {
try {
LOG.info("Force-killing UAM id " + uamId + " for application "
+ attemptIdMap.get(uamId));
+ appIdMap.get(uamId));
return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
@ -132,7 +133,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
LOG.error("Failed to kill unmanaged application master", e);
}
}
this.attemptIdMap.clear();
this.appIdMap.clear();
super.serviceStop();
}
@ -145,13 +146,18 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery.
* @see ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)
* @return uamId for the UAM
* @throws YarnException if registerApplicationMaster fails
* @throws IOException if registerApplicationMaster fails
*/
public String createAndRegisterNewUAM(
RegisterApplicationMasterRequest registerRequest, Configuration conf,
String queueName, String submitter, String appNameSuffix)
String queueName, String submitter, String appNameSuffix,
boolean keepContainersAcrossApplicationAttempts)
throws YarnException, IOException {
ApplicationId appId = null;
ApplicationClientProtocol rmClient;
@ -173,45 +179,52 @@ public class UnmanagedAMPoolManager extends AbstractService {
rmClient = null;
}
createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId,
queueName, submitter, appNameSuffix);
// Launch the UAM in RM
launchUAM(appId.toString(), conf, appId, queueName, submitter,
appNameSuffix, keepContainersAcrossApplicationAttempts);
// Register the UAM application
registerApplicationMaster(appId.toString(), registerRequest);
// Returns the appId as uamId
return appId.toString();
}
/**
* Create a new UAM and register the application, using the provided uamId and
* appId.
* Launch a new UAM, using the provided uamId and appId.
*
* @param uamId identifier for the UAM
* @param registerRequest RegisterApplicationMasterRequest
* @param uamId uam Id
* @param conf configuration for this UAM
* @param appId application id for the UAM
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
* @return RegisterApplicationMasterResponse
* @throws YarnException if registerApplicationMaster fails
* @throws IOException if registerApplicationMaster fails
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery.
* @see ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)
* @return UAM token
* @throws YarnException if fails
* @throws IOException if fails
*/
public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
RegisterApplicationMasterRequest registerRequest, Configuration conf,
public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix) throws YarnException, IOException {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts)
throws YarnException, IOException {
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists");
}
UnmanagedApplicationManager uam =
createUAM(conf, appId, queueName, submitter, appNameSuffix);
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
// Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
RegisterApplicationMasterResponse response = null;
Token<AMRMTokenIdentifier> amrmToken = null;
try {
LOG.info("Creating and registering UAM id {} for application {}", uamId,
appId);
response = uam.createAndRegisterApplicationMaster(registerRequest);
LOG.info("Launching UAM id {} for application {}", uamId, appId);
amrmToken = uam.launchUAM();
} catch (Exception e) {
// Add the map earlier and remove here if register failed because we want
// to make sure there is only one uam instance per uamId at any given time
@ -219,8 +232,48 @@ public class UnmanagedAMPoolManager extends AbstractService {
throw e;
}
this.attemptIdMap.put(uamId, uam.getAttemptId());
return response;
this.appIdMap.put(uamId, uam.getAppId());
return amrmToken;
}
/**
* Re-attach to an existing UAM, using the provided uamIdentifier.
*
* @param uamId uam Id
* @param conf configuration for this UAM
* @param appId application id for the UAM
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
* @param uamToken UAM token
* @throws YarnException if fails
* @throws IOException if fails
*/
public void reAttachUAM(String uamId, Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, Token<AMRMTokenIdentifier> uamToken)
throws YarnException, IOException {
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists");
}
UnmanagedApplicationManager uam =
createUAM(conf, appId, queueName, submitter, appNameSuffix, true);
// Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
try {
LOG.info("Reattaching UAM id {} for application {}", uamId, appId);
uam.reAttachUAM(uamToken);
} catch (Exception e) {
// Add the map earlier and remove here if register failed because we want
// to make sure there is only one uam instance per uamId at any given time
this.unmanagedAppMasterMap.remove(uamId);
throw e;
}
this.appIdMap.put(uamId, uam.getAppId());
}
/**
@ -231,20 +284,42 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param queueName queue of the application
* @param submitter submitter name of the application
* @param appNameSuffix application name suffix
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* @return the UAM instance
*/
@VisibleForTesting
protected UnmanagedApplicationManager createUAM(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix) {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
appNameSuffix);
appNameSuffix, keepContainersAcrossApplicationAttempts);
}
/**
* Register application master for the UAM.
*
* @param uamId uam Id
* @param registerRequest RegisterApplicationMasterRequest
* @return register response
* @throws YarnException if register fails
* @throws IOException if register fails
*/
public RegisterApplicationMasterResponse registerApplicationMaster(
String uamId, RegisterApplicationMasterRequest registerRequest)
throws YarnException, IOException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
LOG.info("Registering UAM id {} for application {}", uamId,
this.appIdMap.get(uamId));
return this.unmanagedAppMasterMap.get(uamId)
.registerApplicationMaster(registerRequest);
}
/**
* AllocateAsync to an UAM.
*
* @param uamId identifier for the UAM
* @param uamId uam Id
* @param request AllocateRequest
* @param callback callback for response
* @throws YarnException if allocate fails
@ -262,7 +337,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
/**
* Finish an UAM/application.
*
* @param uamId identifier for the UAM
* @param uamId uam Id
* @param request FinishApplicationMasterRequest
* @return FinishApplicationMasterResponse
* @throws YarnException if finishApplicationMaster call fails
@ -274,14 +349,15 @@ public class UnmanagedAMPoolManager extends AbstractService {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
LOG.info("Finishing application for UAM id {} ", uamId);
LOG.info("Finishing UAM id {} for application {}", uamId,
this.appIdMap.get(uamId));
FinishApplicationMasterResponse response =
this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request);
if (response.getIsUnregistered()) {
// Only remove the UAM when the unregister finished
this.unmanagedAppMasterMap.remove(uamId);
this.attemptIdMap.remove(uamId);
this.appIdMap.remove(uamId);
LOG.info("UAM id {} is unregistered", uamId);
}
return response;
@ -301,7 +377,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
/**
* Return whether an UAM exists.
*
* @param uamId identifier for the UAM
* @param uamId uam Id
* @return UAM exists or not
*/
public boolean hasUAMId(String uamId) {

View File

@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -90,7 +92,6 @@ public class UnmanagedApplicationManager {
private AMRequestHandlerThread handlerThread;
private ApplicationMasterProtocol rmProxy;
private ApplicationId applicationId;
private ApplicationAttemptId attemptId;
private String submitter;
private String appNameSuffix;
private Configuration conf;
@ -101,9 +102,31 @@ public class UnmanagedApplicationManager {
private ApplicationClientProtocol rmClient;
private long asyncApiPollIntervalMillis;
private RecordFactory recordFactory;
private boolean keepContainersAcrossApplicationAttempts;
/*
* This flag is used as an indication that this method launchUAM/reAttachUAM
* is called (and perhaps blocked in initializeUnmanagedAM below due to RM
* connection/failover issue and not finished yet). Set the flag before
* calling the blocking call to RM.
*/
private boolean connectionInitiated;
/**
* Constructor.
*
* @param conf configuration
* @param appId application Id to use for this UAM
* @param queueName the queue of the UAM
* @param submitter user name of the app
* @param appNameSuffix the app name suffix to use
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery. See {@link ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)}
*/
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
String queueName, String submitter, String appNameSuffix) {
String queueName, String submitter, String appNameSuffix,
boolean keepContainersAcrossApplicationAttempts) {
Preconditions.checkNotNull(conf, "Configuration cannot be null");
Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
Preconditions.checkNotNull(submitter, "App submitter cannot be null");
@ -116,6 +139,7 @@ public class UnmanagedApplicationManager {
this.handlerThread = new AMRequestHandlerThread();
this.requestQueue = new LinkedBlockingQueue<>();
this.rmProxy = null;
this.connectionInitiated = false;
this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
this.asyncApiPollIntervalMillis = conf.getLong(
@ -123,45 +147,84 @@ public class UnmanagedApplicationManager {
YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
YarnConfiguration.
DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
this.keepContainersAcrossApplicationAttempts =
keepContainersAcrossApplicationAttempts;
}
/**
* Launch a new UAM in the resource manager.
*
* @return identifier uam identifier
* @throws YarnException if fails
* @throws IOException if fails
*/
public Token<AMRMTokenIdentifier> launchUAM()
throws YarnException, IOException {
this.connectionInitiated = true;
// Blocking call to RM
Token<AMRMTokenIdentifier> amrmToken =
initializeUnmanagedAM(this.applicationId);
// Creates the UAM connection
createUAMProxy(amrmToken);
return amrmToken;
}
/**
* Re-attach to an existing UAM in the resource manager.
*
* @param amrmToken the UAM token
* @throws IOException if re-attach fails
* @throws YarnException if re-attach fails
*/
public void reAttachUAM(Token<AMRMTokenIdentifier> amrmToken)
throws IOException, YarnException {
this.connectionInitiated = true;
// Creates the UAM connection
createUAMProxy(amrmToken);
}
protected void createUAMProxy(Token<AMRMTokenIdentifier> amrmToken)
throws IOException {
this.userUgi = UserGroupInformation.createProxyUser(
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
this.userUgi, amrmToken);
}
/**
* Registers this {@link UnmanagedApplicationManager} with the resource
* manager.
*
* @param request the register request
* @return the register response
* @param request RegisterApplicationMasterRequest
* @return register response
* @throws YarnException if register fails
* @throws IOException if register fails
*/
public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
// This need to be done first in this method, because it is used as an
// indication that this method is called (and perhaps blocked due to RM
// connection and not finished yet)
// Save the register request for re-register later
this.registerRequest = request;
// attemptId will be available after this call
UnmanagedAMIdentifier identifier =
initializeUnmanagedAM(this.applicationId);
try {
this.userUgi = UserGroupInformation.createProxyUser(
identifier.getAttemptId().toString(),
UserGroupInformation.getCurrentUser());
} catch (IOException e) {
LOG.error("Exception while trying to get current user", e);
throw new YarnRuntimeException(e);
}
this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
this.userUgi, identifier.getToken());
LOG.info("Registering the Unmanaged application master {}", this.attemptId);
// Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
// We do not expect application already registered exception here
LOG.info("Registering the Unmanaged application master {}",
this.applicationId);
RegisterApplicationMasterResponse response =
this.rmProxy.registerApplicationMaster(this.registerRequest);
for (Container container : response.getContainersFromPreviousAttempts()) {
LOG.info("RegisterUAM returned existing running container "
+ container.getId());
}
for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
LOG.info("RegisterUAM returned existing NM token for node "
+ nmToken.getNodeId());
}
// Only when register succeed that we start the heartbeat thread
this.handlerThread.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
@ -187,11 +250,11 @@ public class UnmanagedApplicationManager {
this.handlerThread.shutdown();
if (this.rmProxy == null) {
if (this.registerRequest != null) {
// This is possible if the async registerApplicationMaster is still
if (this.connectionInitiated) {
// This is possible if the async launchUAM is still
// blocked and retrying. Return a dummy response in this case.
LOG.warn("Unmanaged AM still not successfully launched/registered yet."
+ " Stopping the UAM client thread anyways.");
+ " Stopping the UAM heartbeat thread anyways.");
return FinishApplicationMasterResponse.newInstance(false);
} else {
throw new YarnException("finishApplicationMaster should not "
@ -199,7 +262,7 @@ public class UnmanagedApplicationManager {
}
}
return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
this.registerRequest, this.attemptId);
this.registerRequest, this.applicationId);
}
/**
@ -212,7 +275,7 @@ public class UnmanagedApplicationManager {
public KillApplicationResponse forceKillApplication()
throws IOException, YarnException {
KillApplicationRequest request =
KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
KillApplicationRequest.newInstance(this.applicationId);
this.handlerThread.shutdown();
@ -240,29 +303,29 @@ public class UnmanagedApplicationManager {
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
// Two possible cases why the UAM is not successfully registered yet:
// 1. registerApplicationMaster is not called at all. Should throw here.
// 2. registerApplicationMaster is called but hasn't successfully returned.
// 1. launchUAM is not called at all. Should throw here.
// 2. launchUAM is called but hasn't successfully returned.
//
// In case 2, we have already save the allocate request above, so if the
// registration succeed later, no request is lost.
if (this.rmProxy == null) {
if (this.registerRequest != null) {
if (this.connectionInitiated) {
LOG.info("Unmanaged AM still not successfully launched/registered yet."
+ " Saving the allocate request and send later.");
} else {
throw new YarnException(
"AllocateAsync should not be called before createAndRegister");
"AllocateAsync should not be called before launchUAM");
}
}
}
/**
* Returns the application attempt id of the UAM.
* Returns the application id of the UAM.
*
* @return attempt id of the UAM
* @return application id of the UAM
*/
public ApplicationAttemptId getAttemptId() {
return this.attemptId;
public ApplicationId getAppId() {
return this.applicationId;
}
/**
@ -287,15 +350,15 @@ public class UnmanagedApplicationManager {
* Launch and initialize an unmanaged AM. First, it creates a new application
* on the RM and negotiates a new attempt id. Then it waits for the RM
* application attempt state to reach YarnApplicationAttemptState.LAUNCHED
* after which it returns the AM-RM token and the attemptId.
* after which it returns the AM-RM token.
*
* @param appId application id
* @return the UAM identifier
* @return the UAM token
* @throws IOException if initialize fails
* @throws YarnException if initialize fails
*/
protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
throws IOException, YarnException {
protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
ApplicationId appId) throws IOException, YarnException {
try {
UserGroupInformation appSubmitter =
UserGroupInformation.createRemoteUser(this.submitter);
@ -306,13 +369,12 @@ public class UnmanagedApplicationManager {
submitUnmanagedApp(appId);
// Monitor the application attempt to wait for launch state
ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId,
monitorCurrentAppAttempt(appId,
EnumSet.of(YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
YarnApplicationState.FAILED, YarnApplicationState.FINISHED),
YarnApplicationAttemptState.LAUNCHED);
this.attemptId = attemptReport.getApplicationAttemptId();
return getUAMIdentifier();
return getUAMToken();
} finally {
this.rmClient = null;
}
@ -343,6 +405,8 @@ public class UnmanagedApplicationManager {
submitRequest.setApplicationSubmissionContext(context);
context.setUnmanagedAM(true);
context.setKeepContainersAcrossApplicationAttempts(
this.keepContainersAcrossApplicationAttempts);
LOG.info("Submitting unmanaged application {}", appId);
this.rmClient.submitApplication(submitRequest);
@ -374,8 +438,10 @@ public class UnmanagedApplicationManager {
if (appStates.contains(state)) {
if (state != YarnApplicationState.ACCEPTED) {
throw new YarnRuntimeException(
"Received non-accepted application state: " + state
+ ". Application " + appId + " not the first attempt?");
"Received non-accepted application state: " + state + " for "
+ appId + ". This is likely because this is not the first "
+ "app attempt in home sub-cluster, and AMRMProxy HA "
+ "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled.");
}
appAttemptId =
getApplicationReport(appId).getCurrentApplicationAttemptId();
@ -415,25 +481,25 @@ public class UnmanagedApplicationManager {
}
/**
* Gets the identifier of the unmanaged AM.
* Gets the amrmToken of the unmanaged AM.
*
* @return the identifier of the unmanaged AM.
* @return the amrmToken of the unmanaged AM.
* @throws IOException if getApplicationReport fails
* @throws YarnException if getApplicationReport fails
*/
protected UnmanagedAMIdentifier getUAMIdentifier()
protected Token<AMRMTokenIdentifier> getUAMToken()
throws IOException, YarnException {
Token<AMRMTokenIdentifier> token = null;
org.apache.hadoop.yarn.api.records.Token amrmToken =
getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
getApplicationReport(this.applicationId).getAMRMToken();
if (amrmToken != null) {
token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
} else {
LOG.warn(
"AMRMToken not found in the application report for application: {}",
this.attemptId.getApplicationId());
this.applicationId);
}
return new UnmanagedAMIdentifier(this.attemptId, token);
return token;
}
private ApplicationReport getApplicationReport(ApplicationId appId)
@ -444,29 +510,6 @@ public class UnmanagedApplicationManager {
return this.rmClient.getApplicationReport(request).getApplicationReport();
}
/**
* Data structure that encapsulates the application attempt identifier and the
* AMRMTokenIdentifier. Make it public because clients with HA need it.
*/
public static class UnmanagedAMIdentifier {
private ApplicationAttemptId attemptId;
private Token<AMRMTokenIdentifier> token;
public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
Token<AMRMTokenIdentifier> token) {
this.attemptId = attemptId;
this.token = token;
}
public ApplicationAttemptId getAttemptId() {
return this.attemptId;
}
public Token<AMRMTokenIdentifier> getToken() {
return this.token;
}
}
/**
* Data structure that encapsulates AllocateRequest and AsyncCallback
* instance.
@ -549,8 +592,10 @@ public class UnmanagedApplicationManager {
}
request.setResponseId(lastResponseId);
AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
request, rmProxy, registerRequest, attemptId);
request, rmProxy, registerRequest, applicationId);
if (response == null) {
throw new YarnException("Null allocateResponse from allocate");
}
@ -578,18 +623,17 @@ public class UnmanagedApplicationManager {
LOG.debug("Interrupted while waiting for queue", ex);
}
} catch (IOException ex) {
LOG.warn(
"IO Error occurred while processing heart beat for " + attemptId,
ex);
LOG.warn("IO Error occurred while processing heart beat for "
+ applicationId, ex);
} catch (Throwable ex) {
LOG.warn(
"Error occurred while processing heart beat for " + attemptId,
"Error occurred while processing heart beat for " + applicationId,
ex);
}
}
LOG.info("UnmanagedApplicationManager has been stopped for {}. "
+ "AMRequestHandlerThread thread is exiting", attemptId);
+ "AMRequestHandlerThread thread is exiting", applicationId);
}
}
@ -600,8 +644,8 @@ public class UnmanagedApplicationManager {
implements UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Heartbeat thread {} for application attempt {} crashed!",
t.getName(), attemptId, e);
LOG.error("Heartbeat thread {} for application {} crashed!",
t.getName(), applicationId, e);
}
}
}

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@ -63,16 +63,16 @@ public final class AMRMClientUtils {
/**
* Handle ApplicationNotRegistered exception and re-register.
*
* @param attemptId app attemptId
* @param appId application Id
* @param rmProxy RM proxy instance
* @param registerRequest the AM re-register request
* @throws YarnException if re-register fails
*/
public static void handleNotRegisteredExceptionAndReRegister(
ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
ApplicationId appId, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest) throws YarnException {
LOG.info("App attempt {} not registered, most likely due to RM failover. "
+ " Trying to re-register.", attemptId);
+ " Trying to re-register.", appId);
try {
rmProxy.registerApplicationMaster(registerRequest);
} catch (Exception e) {
@ -93,25 +93,24 @@ public final class AMRMClientUtils {
* @param request allocate request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
* @param attemptId application attempt id
* @param appId application id
* @return allocate response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static AllocateResponse allocateWithReRegister(AllocateRequest request,
ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest,
ApplicationAttemptId attemptId) throws YarnException, IOException {
RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
throws YarnException, IOException {
try {
return rmProxy.allocate(request);
} catch (ApplicationMasterNotRegisteredException e) {
handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// reset responseId after re-register
request.setResponseId(0);
// retry allocate
return allocateWithReRegister(request, rmProxy, registerRequest,
attemptId);
return allocateWithReRegister(request, rmProxy, registerRequest, appId);
}
}
@ -123,23 +122,22 @@ public final class AMRMClientUtils {
* @param request finishApplicationMaster request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
* @param attemptId application attempt id
* @param appId application id
* @return finishApplicationMaster response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static FinishApplicationMasterResponse finishAMWithReRegister(
FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest,
ApplicationAttemptId attemptId) throws YarnException, IOException {
RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
throws YarnException, IOException {
try {
return rmProxy.finishApplicationMaster(request);
} catch (ApplicationMasterNotRegisteredException ex) {
handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// retry finishAM after re-register
return finishAMWithReRegister(request, rmProxy, registerRequest,
attemptId);
return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
}
}

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -171,10 +172,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
LoggerFactory.getLogger(MockResourceManagerFacade.class);
private HashSet<ApplicationId> applicationMap = new HashSet<>();
private HashMap<String, List<ContainerId>> applicationContainerIdMap =
new HashMap<String, List<ContainerId>>();
private HashMap<ContainerId, Container> allocatedContainerMap =
new HashMap<ContainerId, Container>();
private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>();
private HashMap<ApplicationAttemptId, List<ContainerId>>
applicationContainerIdMap = new HashMap<>();
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
private int subClusterId;
@ -215,7 +215,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
this.isRunning = mode;
}
private static String getAppIdentifier() throws IOException {
private static ApplicationAttemptId getAppIdentifier() throws IOException {
AMRMTokenIdentifier result = null;
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
@ -225,7 +225,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
break;
}
}
return result != null ? result.getApplicationAttemptId().toString() : "";
return result != null ? result.getApplicationAttemptId()
: ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0);
}
private void validateRunning() throws ConnectException {
@ -240,19 +241,32 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
throws YarnException, IOException {
validateRunning();
String amrmToken = getAppIdentifier();
LOG.info("Registering application attempt: " + amrmToken);
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Registering application attempt: " + attemptId);
shouldReRegisterNext = false;
List<Container> containersFromPreviousAttempt = null;
synchronized (applicationContainerIdMap) {
if (applicationContainerIdMap.containsKey(amrmToken)) {
if (applicationContainerIdMap.containsKey(attemptId)) {
if (keepContainerOnUams.contains(attemptId.getApplicationId())) {
// For UAM with the keepContainersFromPreviousAttempt flag, return all
// running containers
containersFromPreviousAttempt = new ArrayList<>();
for (ContainerId containerId : applicationContainerIdMap
.get(attemptId)) {
containersFromPreviousAttempt.add(Container.newInstance(containerId,
null, null, null, null, null));
}
} else {
throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
}
} else {
// Keep track of the containers that are returned to this application
applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>());
}
}
// Make sure we wait for certain test cases last in the method
@ -272,7 +286,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
null, request.getHost(), null);
containersFromPreviousAttempt, request.getHost(), null);
}
@Override
@ -282,8 +296,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning();
String amrmToken = getAppIdentifier();
LOG.info("Finishing application attempt: " + amrmToken);
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Finishing application attempt: " + attemptId);
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@ -293,12 +307,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application
Assert.assertTrue("The application id is NOT registered: " + amrmToken,
applicationContainerIdMap.containsKey(amrmToken));
List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
for (ContainerId c : ids) {
allocatedContainerMap.remove(c);
}
Assert.assertTrue("The application id is NOT registered: " + attemptId,
applicationContainerIdMap.containsKey(attemptId));
applicationContainerIdMap.remove(attemptId);
}
return FinishApplicationMasterResponse.newInstance(
@ -328,8 +339,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ "askList and releaseList in the same heartbeat");
}
String amrmToken = getAppIdentifier();
LOG.info("Allocate from application attempt: " + amrmToken);
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Allocate from application attempt: " + attemptId);
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@ -361,16 +372,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
// will need it in future
Assert.assertTrue(
"The application id is Not registered before allocate(): "
+ amrmToken,
applicationContainerIdMap.containsKey(amrmToken));
List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+ attemptId,
applicationContainerIdMap.containsKey(attemptId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
ids.add(containerId);
this.allocatedContainerMap.put(containerId, container);
}
}
}
}
List<ContainerStatus> completedList = new ArrayList<>();
if (request.getReleaseList() != null
&& request.getReleaseList().size() > 0) {
LOG.info("Releasing containers: " + request.getReleaseList().size());
@ -378,9 +389,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
Assert
.assertTrue(
"The application id is not registered before allocate(): "
+ amrmToken,
applicationContainerIdMap.containsKey(amrmToken));
List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+ attemptId,
applicationContainerIdMap.containsKey(attemptId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
for (ContainerId id : request.getReleaseList()) {
boolean found = false;
@ -396,18 +407,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ conf.get("AMRMTOKEN"), found);
ids.remove(id);
// Return the released container back to the AM with new fake Ids. The
// test case does not care about the IDs. The IDs are faked because
// otherwise the LRM will throw duplication identifier exception. This
// returning of fake containers is ONLY done for testing purpose - for
// the test code to get confirmation that the sub-cluster resource
// managers received the release request
ContainerId fakeContainerId = ContainerId.newInstance(
getApplicationAttemptId(1), containerIndex.incrementAndGet());
Container fakeContainer = allocatedContainerMap.get(id);
fakeContainer.setId(fakeContainerId);
containerList.add(fakeContainer);
completedList.add(
ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0));
}
}
}
@ -418,8 +419,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
// Always issue a new AMRMToken as if RM rolled master key
Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
return AllocateResponse.newInstance(0,
new ArrayList<ContainerStatus>(), containerList,
return AllocateResponse.newInstance(0, completedList, containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
new ArrayList<NMToken>(), newAMRMToken,
new ArrayList<UpdatedContainer>(), null);
@ -438,6 +438,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
report.setApplicationId(request.getApplicationId());
report.setCurrentApplicationAttemptId(
ApplicationAttemptId.newInstance(request.getApplicationId(), 1));
report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], ""));
response.setApplicationReport(report);
return response;
}
@ -481,6 +482,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
LOG.info("Application submitted: " + appId);
applicationMap.add(appId);
if (request.getApplicationSubmissionContext().getUnmanagedAM()
|| request.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
keepContainerOnUams.add(appId);
}
return SubmitApplicationResponse.newInstance();
}
@ -497,6 +504,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
keepContainerOnUams.remove(appId);
}
LOG.info("Force killing application: " + appId);
return KillApplicationResponse.newInstance(true);

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Unit test for FederationRegistryClient.
*/
public class TestFederationRegistryClient {
private Configuration conf;
private UserGroupInformation user;
private RegistryOperations registry;
private FederationRegistryClient registryClient;
@Before
public void setup() throws Exception {
this.conf = new YarnConfiguration();
this.registry = new FSRegistryOperationsService();
this.registry.init(this.conf);
this.registry.start();
this.user = UserGroupInformation.getCurrentUser();
this.registryClient =
new FederationRegistryClient(this.conf, this.registry, this.user);
this.registryClient.cleanAllApplications();
Assert.assertEquals(0, this.registryClient.getAllApplications().size());
}
@After
public void breakDown() {
registryClient.cleanAllApplications();
Assert.assertEquals(0, registryClient.getAllApplications().size());
registry.stop();
}
@Test
public void testBasicCase() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
String scId1 = "subcluster1";
String scId2 = "subcluster2";
this.registryClient.writeAMRMTokenForUAM(appId, scId1,
new Token<AMRMTokenIdentifier>());
this.registryClient.writeAMRMTokenForUAM(appId, scId2,
new Token<AMRMTokenIdentifier>());
// Duplicate entry, should overwrite
this.registryClient.writeAMRMTokenForUAM(appId, scId1,
new Token<AMRMTokenIdentifier>());
Assert.assertEquals(1, this.registryClient.getAllApplications().size());
Assert.assertEquals(2,
this.registryClient.loadStateFromRegistry(appId).size());
this.registryClient.removeAppFromRegistry(appId);
Assert.assertEquals(0, this.registryClient.getAllApplications().size());
Assert.assertEquals(0,
this.registryClient.loadStateFromRegistry(appId).size());
}
}

View File

@ -65,7 +65,7 @@ public class TestUnmanagedApplicationManager {
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
uam = new TestableUnmanagedApplicationManager(conf,
attemptId.getApplicationId(), null, "submitter", "appNameSuffix");
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
}
protected void waitForCallBackCountAndCheckZeroPending(
@ -88,7 +88,8 @@ public class TestUnmanagedApplicationManager {
public void testBasicUsage()
throws YarnException, IOException, InterruptedException {
createAndRegisterApplicationMaster(
launchUAM(attemptId);
registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
@ -102,11 +103,48 @@ public class TestUnmanagedApplicationManager {
attemptId);
}
/*
* Test re-attaching of an existing UAM. This is for HA of UAM client.
*/
@Test(timeout = 5000)
public void testUAMReAttach()
throws YarnException, IOException, InterruptedException {
launchUAM(attemptId);
registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
attemptId);
// Wait for outstanding async allocate callback
waitForCallBackCountAndCheckZeroPending(callback, 1);
MockResourceManagerFacade rmProxy = uam.getRMProxy();
uam = new TestableUnmanagedApplicationManager(conf,
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
uam.setRMProxy(rmProxy);
reAttachUAM(null, attemptId);
registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
attemptId);
// Wait for outstanding async allocate callback
waitForCallBackCountAndCheckZeroPending(callback, 2);
finishApplicationMaster(
FinishApplicationMasterRequest.newInstance(null, null, null),
attemptId);
}
@Test(timeout = 5000)
public void testReRegister()
throws YarnException, IOException, InterruptedException {
createAndRegisterApplicationMaster(
launchUAM(attemptId);
registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.setShouldReRegisterNext();
@ -137,7 +175,8 @@ public class TestUnmanagedApplicationManager {
@Override
public void run() {
try {
createAndRegisterApplicationMaster(
launchUAM(attemptId);
registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 1001, null),
attemptId);
} catch (Exception e) {
@ -221,7 +260,8 @@ public class TestUnmanagedApplicationManager {
@Test
public void testForceKill()
throws YarnException, IOException, InterruptedException {
createAndRegisterApplicationMaster(
launchUAM(attemptId);
registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.forceKillApplication();
@ -241,8 +281,31 @@ public class TestUnmanagedApplicationManager {
return ugi;
}
protected RegisterApplicationMasterResponse
createAndRegisterApplicationMaster(
protected Token<AMRMTokenIdentifier> launchUAM(
ApplicationAttemptId appAttemptId)
throws IOException, InterruptedException {
return getUGIWithToken(appAttemptId)
.doAs(new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() {
@Override
public Token<AMRMTokenIdentifier> run() throws Exception {
return uam.launchUAM();
}
});
}
protected void reAttachUAM(final Token<AMRMTokenIdentifier> uamToken,
ApplicationAttemptId appAttemptId)
throws IOException, InterruptedException {
getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Token<AMRMTokenIdentifier> run() throws Exception {
uam.reAttachUAM(uamToken);
return null;
}
});
}
protected RegisterApplicationMasterResponse registerApplicationMaster(
final RegisterApplicationMasterRequest request,
ApplicationAttemptId appAttemptId)
throws YarnException, IOException, InterruptedException {
@ -251,9 +314,7 @@ public class TestUnmanagedApplicationManager {
@Override
public RegisterApplicationMasterResponse run()
throws YarnException, IOException {
RegisterApplicationMasterResponse response =
uam.createAndRegisterApplicationMaster(request);
return response;
return uam.registerApplicationMaster(request);
}
});
}
@ -311,8 +372,9 @@ public class TestUnmanagedApplicationManager {
public TestableUnmanagedApplicationManager(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix) {
super(conf, appId, queueName, submitter, appNameSuffix);
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts);
}
@SuppressWarnings("unchecked")
@ -330,6 +392,14 @@ public class TestUnmanagedApplicationManager {
rmProxy.setShouldReRegisterNext();
}
}
public MockResourceManagerFacade getRMProxy() {
return rmProxy;
}
public void setRMProxy(MockResourceManagerFacade proxy) {
this.rmProxy = proxy;
}
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -67,4 +69,18 @@ public interface AMRMProxyApplicationContext {
*/
Context getNMCotext();
/**
* Gets the credentials of this application.
*
* @return the credentials.
*/
Credentials getCredentials();
/**
* Gets the registry client.
*
* @return the registry.
*/
RegistryOperations getRegistryClient();
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -42,6 +44,8 @@ public class AMRMProxyApplicationContextImpl implements
private Integer localTokenKeyId;
private Token<AMRMTokenIdentifier> amrmToken;
private Token<AMRMTokenIdentifier> localToken;
private Credentials credentials;
private RegistryOperations registry;
/**
* Create an instance of the AMRMProxyApplicationContext.
@ -52,17 +56,23 @@ public class AMRMProxyApplicationContextImpl implements
* @param user user name of the application
* @param amrmToken amrmToken issued by RM
* @param localToken amrmToken issued by AMRMProxy
* @param credentials application credentials
* @param registry Yarn Registry client
*/
public AMRMProxyApplicationContextImpl(Context nmContext,
Configuration conf, ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken) {
@SuppressWarnings("checkstyle:parameternumber")
public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf,
ApplicationAttemptId applicationAttemptId, String user,
Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken, Credentials credentials,
RegistryOperations registry) {
this.nmContext = nmContext;
this.conf = conf;
this.applicationAttemptId = applicationAttemptId;
this.user = user;
this.amrmToken = amrmToken;
this.localToken = localToken;
this.credentials = credentials;
this.registry = registry;
}
@Override
@ -88,11 +98,14 @@ public class AMRMProxyApplicationContextImpl implements
/**
* Sets the application's AMRMToken.
*
* @param amrmToken amrmToken issued by RM
* @param amrmToken the new amrmToken from RM
* @return whether the saved token is updated to a different value
*/
public synchronized void setAMRMToken(
public synchronized boolean setAMRMToken(
Token<AMRMTokenIdentifier> amrmToken) {
Token<AMRMTokenIdentifier> oldValue = this.amrmToken;
this.amrmToken = amrmToken;
return !this.amrmToken.equals(oldValue);
}
@Override
@ -134,4 +147,14 @@ public class AMRMProxyApplicationContextImpl implements
public Context getNMCotext() {
return nmContext;
}
@Override
public Credentials getCredentials() {
return this.credentials;
}
@Override
public RegistryOperations getRegistryClient() {
return this.registry;
}
}

View File

@ -34,12 +34,13 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -60,15 +61,19 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -82,7 +87,7 @@ import com.google.common.base.Preconditions;
* pipeline is a chain of interceptor instances that can inspect and modify the
* request/response as needed.
*/
public class AMRMProxyService extends AbstractService implements
public class AMRMProxyService extends CompositeService implements
ApplicationMasterProtocol {
private static final Logger LOG = LoggerFactory
.getLogger(AMRMProxyService.class);
@ -96,6 +101,7 @@ public class AMRMProxyService extends AbstractService implements
private InetSocketAddress listenerEndpoint;
private AMRMProxyTokenSecretManager secretManager;
private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
private RegistryOperations registry;
/**
* Creates an instance of the service.
@ -118,10 +124,23 @@ public class AMRMProxyService extends AbstractService implements
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
this.secretManager =
new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore());
this.secretManager.init(conf);
// Both second app attempt and NM restart within Federation need registry
if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED,
YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED)
|| conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) {
this.registry = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.YARN_REGISTRY_CLASS,
YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
RegistryOperations.class);
addService(this.registry);
}
super.serviceInit(conf);
}
@Override
@ -203,6 +222,8 @@ public class AMRMProxyService extends AbstractService implements
amrmToken = new Token<>();
amrmToken.decodeFromUrlString(
new String(contextEntry.getValue(), "UTF-8"));
// Clear the service field, as if RM just issued the token
amrmToken.setService(new Text());
}
}
@ -214,12 +235,36 @@ public class AMRMProxyService extends AbstractService implements
throw new IOException("No user found for app attempt " + attemptId);
}
// Regenerate the local AMRMToken for the AM
Token<AMRMTokenIdentifier> localToken =
this.secretManager.createAndGetAMRMToken(attemptId);
// Retrieve the AM container credentials from NM context
Credentials amCred = null;
for (Container container : this.nmContext.getContainers().values()) {
LOG.debug("From NM Context container " + container.getContainerId());
if (container.getContainerId().getApplicationAttemptId().equals(
attemptId) && container.getContainerTokenIdentifier() != null) {
LOG.debug("Container type "
+ container.getContainerTokenIdentifier().getContainerType());
if (container.getContainerTokenIdentifier()
.getContainerType() == ContainerType.APPLICATION_MASTER) {
LOG.info("AM container {} found in context, has credentials: {}",
container.getContainerId(),
(container.getCredentials() != null));
amCred = container.getCredentials();
}
}
}
if (amCred == null) {
LOG.error("No credentials found for AM container of {}. "
+ "Yarn registry access might not work", attemptId);
}
// Create the intercepter pipeline for the AM
initializePipeline(attemptId, user, amrmToken, localToken,
entry.getValue(), true);
} catch (Exception e) {
entry.getValue(), true, amCred);
} catch (IOException e) {
LOG.error("Exception when recovering " + attemptId
+ ", removing it from NMStateStore and move on", e);
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
@ -326,7 +371,7 @@ public class AMRMProxyService extends AbstractService implements
initializePipeline(appAttemptId,
containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken,
localToken, null, false);
localToken, null, false, credentials);
}
/**
@ -342,7 +387,8 @@ public class AMRMProxyService extends AbstractService implements
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken,
Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
Map<String, byte[]> recoveredDataMap, boolean isRecovery,
Credentials credentials) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (applPipelineMap) {
if (applPipelineMap
@ -404,8 +450,9 @@ public class AMRMProxyService extends AbstractService implements
try {
RequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(createApplicationMasterContext(this.nmContext,
applicationAttemptId, user, amrmToken, localToken));
interceptorChain.init(
createApplicationMasterContext(this.nmContext, applicationAttemptId,
user, amrmToken, localToken, credentials, this.registry));
if (isRecovery) {
if (recoveredDataMap == null) {
throw new YarnRuntimeException(
@ -497,14 +544,12 @@ public class AMRMProxyService extends AbstractService implements
allocateResponse.setAMRMToken(null);
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
token.getIdentifier().array(), token.getPassword().array(),
new Text(token.getKind()), new Text(token.getService()));
ConverterUtils.convertFromYarn(token, (Text) null);
context.setAMRMToken(newToken);
// Update the AMRMToken in context map in NM state store
if (this.nmContext.getNMStateStore() != null) {
// Update the AMRMToken in context map, and in NM state store if it is
// different
if (context.setAMRMToken(newToken)
&& this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
@ -547,10 +592,12 @@ public class AMRMProxyService extends AbstractService implements
private AMRMProxyApplicationContext createApplicationMasterContext(
Context context, ApplicationAttemptId applicationAttemptId, String user,
Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken) {
Token<AMRMTokenIdentifier> localToken, Credentials credentials,
RegistryOperations registryImpl) {
AMRMProxyApplicationContextImpl appContext =
new AMRMProxyApplicationContextImpl(context, getConfig(),
applicationAttemptId, user, amrmToken, localToken);
applicationAttemptId, user, amrmToken, localToken, credentials,
registryImpl);
return appContext;
}

View File

@ -34,6 +34,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -56,17 +59,20 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -145,6 +151,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private UserGroupInformation appOwner;
private FederationRegistryClient registryClient;
/**
* Creates an instance of the FederationInterceptor class.
*/
@ -179,6 +187,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} catch (Exception ex) {
throw new YarnRuntimeException(ex);
}
// Add all app tokens for Yarn Registry access
if (this.registryClient != null && appContext.getCredentials() != null) {
this.appOwner.addCredentials(appContext.getCredentials());
}
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
@ -192,6 +204,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.uamPool.init(conf);
this.uamPool.start();
if (appContext.getRegistryClient() != null) {
this.registryClient = new FederationRegistryClient(conf,
appContext.getRegistryClient(), this.appOwner);
}
}
/**
@ -250,20 +267,27 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
this.amRegistrationResponse =
this.homeRM.registerApplicationMaster(request);
if (this.amRegistrationResponse
.getContainersFromPreviousAttempts() != null) {
cacheAllocatedContainers(
this.amRegistrationResponse.getContainersFromPreviousAttempts(),
this.homeSubClusterId);
}
ApplicationId appId =
getApplicationContext().getApplicationAttemptId().getApplicationId();
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
// the queue this application belongs will be used for getting
// AMRMProxy policy from state store.
String queue = this.amRegistrationResponse.getQueue();
if (queue == null) {
LOG.warn("Received null queue for application "
+ getApplicationContext().getApplicationAttemptId().getApplicationId()
+ " from home sub-cluster. Will use default queue name "
LOG.warn("Received null queue for application " + appId
+ " from home subcluster. Will use default queue name "
+ YarnConfiguration.DEFAULT_QUEUE_NAME
+ " for getting AMRMProxyPolicy");
} else {
LOG.info("Application "
+ getApplicationContext().getApplicationAttemptId().getApplicationId()
+ " belongs to queue " + queue);
LOG.info("Application " + appId + " belongs to queue " + queue);
}
// Initialize the AMRMProxyPolicy
@ -304,7 +328,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
requests.get(this.homeSubClusterId), this.homeRM,
this.amRegistrationRequest,
getApplicationContext().getApplicationAttemptId());
getApplicationContext().getApplicationAttemptId().getApplicationId());
// Notify policy of home response
try {
@ -393,8 +417,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// request to the home resource manager on this thread.
FinishApplicationMasterResponse homeResponse =
AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
this.amRegistrationRequest,
getApplicationContext().getApplicationAttemptId());
this.amRegistrationRequest, getApplicationContext()
.getApplicationAttemptId().getApplicationId());
if (subClusterIds.size() > 0) {
// Wait for other sub-cluster resource managers to return the
@ -425,6 +449,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (failedToUnRegister) {
homeResponse.setIsUnregistered(false);
} else {
// Clean up UAMs only when the app finishes successfully, so that no more
// attempt will be launched.
this.uamPool.stop();
if (this.registryClient != null) {
this.registryClient.removeAppFromRegistry(getApplicationContext()
.getApplicationAttemptId().getApplicationId());
}
}
return homeResponse;
}
@ -442,9 +474,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
@Override
public void shutdown() {
if (this.uamPool != null) {
this.uamPool.stop();
}
// Do not stop uamPool service and kill UAMs here because of possible second
// app attempt
if (threadpool != null) {
try {
threadpool.shutdown();
@ -455,6 +486,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
super.shutdown();
}
/**
* Only for unit test cleanup.
*/
@VisibleForTesting
protected void cleanupRegistry() {
if (this.registryClient != null) {
this.registryClient.cleanAllApplications();
}
}
/**
* Create the UAM pool manager for secondary sub-clsuters. For unit test to
* override.
@ -486,6 +527,120 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
private void mergeRegisterResponse(
RegisterApplicationMasterResponse homeResponse,
RegisterApplicationMasterResponse otherResponse) {
if (!isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) {
if (!isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) {
homeResponse.getContainersFromPreviousAttempts()
.addAll(otherResponse.getContainersFromPreviousAttempts());
} else {
homeResponse.setContainersFromPreviousAttempts(
otherResponse.getContainersFromPreviousAttempts());
}
}
if (!isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) {
if (!isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) {
homeResponse.getNMTokensFromPreviousAttempts()
.addAll(otherResponse.getNMTokensFromPreviousAttempts());
} else {
homeResponse.setNMTokensFromPreviousAttempts(
otherResponse.getNMTokensFromPreviousAttempts());
}
}
}
/**
* Try re-attach to all existing and running UAMs in secondary sub-clusters
* launched by previous application attempts if any. All running containers in
* the UAMs will be combined into the registerResponse. For the first attempt,
* the registry will be empty for this application and thus no-op here.
*/
protected void reAttachUAMAndMergeRegisterResponse(
RegisterApplicationMasterResponse homeResponse,
final ApplicationId appId) {
if (this.registryClient == null) {
// Both AMRMProxy HA and NM work preserving restart is not enabled
LOG.warn("registryClient is null, skip attaching existing UAM if any");
return;
}
// Load existing running UAMs from the previous attempts from
// registry, if any
Map<String, Token<AMRMTokenIdentifier>> uamMap =
this.registryClient.loadStateFromRegistry(appId);
if (uamMap.size() == 0) {
LOG.info("No existing UAM for application {} found in Yarn Registry",
appId);
return;
}
LOG.info("Found {} existing UAMs for application {} in Yarn Registry. "
+ "Reattaching in parallel", uamMap.size(), appId);
ExecutorCompletionService<RegisterApplicationMasterResponse>
completionService = new ExecutorCompletionService<>(threadpool);
for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
final SubClusterId subClusterId =
SubClusterId.newInstance(entry.getKey());
final Token<AMRMTokenIdentifier> amrmToken = entry.getValue();
completionService
.submit(new Callable<RegisterApplicationMasterResponse>() {
@Override
public RegisterApplicationMasterResponse call() throws Exception {
RegisterApplicationMasterResponse response = null;
try {
// Create a config loaded with federation on and subclusterId
// for each UAM
YarnConfiguration config = new YarnConfiguration(getConf());
FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId.getId());
uamPool.reAttachUAM(subClusterId.getId(), config, appId,
amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), homeSubClusterId.getId(),
amrmToken);
response = uamPool.registerApplicationMaster(
subClusterId.getId(), amRegistrationRequest);
if (response != null
&& response.getContainersFromPreviousAttempts() != null) {
cacheAllocatedContainers(
response.getContainersFromPreviousAttempts(),
subClusterId);
}
LOG.info("UAM {} reattached for {}", subClusterId, appId);
} catch (Throwable e) {
LOG.error(
"Reattaching UAM " + subClusterId + " failed for " + appId,
e);
}
return response;
}
});
}
// Wait for the re-attach responses
for (int i = 0; i < uamMap.size(); i++) {
try {
Future<RegisterApplicationMasterResponse> future =
completionService.take();
RegisterApplicationMasterResponse registerResponse = future.get();
if (registerResponse != null) {
LOG.info("Merging register response for {}", appId);
mergeRegisterResponse(homeResponse, registerResponse);
}
} catch (Exception e) {
LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e);
}
}
}
private SubClusterId getSubClusterForNode(String nodeName) {
SubClusterId subClusterId = null;
try {
@ -655,6 +810,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
responses.add(response);
}
// Save the new AMRMToken for the UAM in registry if present
if (response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null);
// Update the token in registry
if (registryClient != null) {
registryClient
.writeAMRMTokenForUAM(
getApplicationContext().getApplicationAttemptId()
.getApplicationId(),
subClusterId.getId(), newToken);
}
}
// Notify policy of secondary sub-cluster responses
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
@ -714,20 +883,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
subClusterId);
RegisterApplicationMasterResponse uamResponse = null;
Token<AMRMTokenIdentifier> token = null;
try {
// For appNameSuffix, use subClusterId of the home sub-cluster
uamResponse = uamPool.createAndRegisterNewUAM(subClusterId,
registerRequest, config,
token = uamPool.launchUAM(subClusterId, config,
appContext.getApplicationAttemptId().getApplicationId(),
amRegistrationResponse.getQueue(), appContext.getUser(),
homeSubClusterId.toString());
homeSubClusterId.toString(), registryClient != null);
uamResponse = uamPool.registerApplicationMaster(subClusterId,
registerRequest);
} catch (Throwable e) {
LOG.error("Failed to register application master: "
+ subClusterId + " Application: "
+ appContext.getApplicationAttemptId(), e);
}
return new RegisterApplicationMasterResponseInfo(uamResponse,
SubClusterId.newInstance(subClusterId));
SubClusterId.newInstance(subClusterId), token);
}
});
}
@ -752,6 +924,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ getApplicationContext().getApplicationAttemptId());
successfulRegistrations.put(uamResponse.getSubClusterId(),
uamResponse.getResponse());
if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(
getApplicationContext().getApplicationAttemptId()
.getApplicationId(),
uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken());
}
}
} catch (Exception e) {
LOG.warn("Failed to register unmanaged application master: "
@ -1087,11 +1267,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private static class RegisterApplicationMasterResponseInfo {
private RegisterApplicationMasterResponse response;
private SubClusterId subClusterId;
private Token<AMRMTokenIdentifier> uamToken;
RegisterApplicationMasterResponseInfo(
RegisterApplicationMasterResponse response, SubClusterId subClusterId) {
RegisterApplicationMasterResponse response, SubClusterId subClusterId,
Token<AMRMTokenIdentifier> uamToken) {
this.response = response;
this.subClusterId = subClusterId;
this.uamToken = uamToken;
}
public RegisterApplicationMasterResponse getResponse() {
@ -1101,6 +1284,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public SubClusterId getSubClusterId() {
return subClusterId;
}
public Token<AMRMTokenIdentifier> getUamToken() {
return uamToken;
}
}
/**

View File

@ -353,10 +353,6 @@ public class ContainerManagerImpl extends CompositeService implements
rsrcLocalizationSrvc.recoverLocalizedResources(
stateStore.loadLocalizationState());
if (this.amrmProxyEnabled) {
this.getAMRMProxyService().recover();
}
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
for (ContainerManagerApplicationProto proto :
appsState.getApplications()) {
@ -373,6 +369,11 @@ public class ContainerManagerImpl extends CompositeService implements
recoverContainer(rcs);
}
// Recovery AMRMProxy state after apps and containers are recovered
if (this.amrmProxyEnabled) {
this.getAMRMProxyService().recover();
}
//Dispatching the RECOVERY_COMPLETED event through the dispatcher
//so that all the paused, scheduled and queued containers will
//be scheduled for execution on availability of resources.

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -179,6 +180,15 @@ public abstract class BaseAMRMProxyTest {
return new NMContext(null, null, null, null, stateStore, false, this.conf);
}
protected List<ContainerId> getCompletedContainerIds(
List<ContainerStatus> containerStatus) {
List<ContainerId> ret = new ArrayList<>();
for (ContainerStatus status : containerStatus) {
ret.add(status.getContainerId());
}
return ret;
}
/**
* This helper method will invoke the specified function in parallel for each
* end point in the specified list using a thread pool and return the
@ -623,7 +633,7 @@ public abstract class BaseAMRMProxyTest {
*/
public void initApp(ApplicationAttemptId applicationId, String user) {
super.initializePipeline(applicationId, user,
new Token<AMRMTokenIdentifier>(), null, null, false);
new Token<AMRMTokenIdentifier>(), null, null, false, null);
}
public void stopApp(ApplicationId applicationId) {

View File

@ -444,7 +444,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
getAMRMProxyService().initializePipeline(applicationAttemptId, user,
new Token<AMRMTokenIdentifier>(), null, null, false);
new Token<AMRMTokenIdentifier>(), null, null, false, null);
RequestInterceptorChainWrapper chain2 =
getAMRMProxyService().getPipelines().get(appId);
@ -531,16 +531,14 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
// The way the mock resource manager is setup, it will return the containers
// that were released in the response. This is done because the UAMs run
// asynchronously and we need to if all the resource managers received the
// release it. The containers sent by the mock resource managers will be
// We need to make sure all the resource managers received the
// release list. The containers sent by the mock resource managers will be
// aggregated and returned back to us and we can assert if all the release
// lists reached the sub-clusters
List<Container> containersForReleasedContainerIds =
new ArrayList<Container>();
containersForReleasedContainerIds.addAll(allocateResponse
.getAllocatedContainers());
List<ContainerId> containersForReleasedContainerIds = new ArrayList<>();
List<ContainerId> newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
@ -554,8 +552,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
containersForReleasedContainerIds.addAll(allocateResponse
.getAllocatedContainers());
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers()

View File

@ -19,16 +19,20 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -59,6 +63,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -79,7 +87,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private TestableFederationInterceptor interceptor;
private MemoryFederationStateStore stateStore;
private NMStateStoreService nmStateStore;
private RegistryOperations registry;
private Context nmContext;
private int testAppId;
private ApplicationAttemptId attemptId;
@ -93,15 +104,28 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
nmStateStore = new NMMemoryStateStoreService();
nmStateStore.init(getConf());
nmStateStore.start();
registry = new FSRegistryOperationsService();
registry.init(getConf());
registry.start();
testAppId = 1;
attemptId = getApplicationAttemptId(testAppId);
interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
attemptId, "test-user", null, null));
nmContext =
new NMContext(null, null, null, null, nmStateStore, false, getConf());
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(),
attemptId, "test-user", null, null, null, registry));
interceptor.cleanupRegistry();
}
@Override
public void tearDown() {
interceptor.cleanupRegistry();
interceptor.shutdown();
registry.stop();
super.tearDown();
}
@ -207,18 +231,17 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse);
// The way the mock resource manager is setup, it will return the containers
// that were released in the allocated containers. The release request will
// be split and handled by the corresponding UAM. The release containers
// returned by the mock resource managers will be aggregated and returned
// back to us and we can check if total request size and returned size are
// the same
List<Container> containersForReleasedContainerIds =
new ArrayList<Container>();
containersForReleasedContainerIds
.addAll(allocateResponse.getAllocatedContainers());
// The release request will be split and handled by the corresponding UAM.
// The release containers returned by the mock resource managers will be
// aggregated and returned back to us and we can check if total request size
// and returned size are the same
List<ContainerId> containersForReleasedContainerIds =
new ArrayList<ContainerId>();
List<ContainerId> newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
+ Integer.toString(newlyFinished.size()));
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
@ -228,11 +251,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
allocateResponse =
interceptor.allocate(Records.newRecord(AllocateRequest.class));
Assert.assertNotNull(allocateResponse);
containersForReleasedContainerIds
.addAll(allocateResponse.getAllocatedContainers());
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
+ Integer.toString(newlyFinished.size()));
LOG.info("Total number of containers received: "
+ Integer.toString(containersForReleasedContainerIds.size()));
Thread.sleep(10);
@ -547,4 +571,74 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
Assert.assertEquals(1, response.getUpdatedContainers().size());
Assert.assertEquals(1, response.getUpdateErrors().size());
}
@Test
public void testSecondAttempt() throws Exception {
ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
System.out.println(c.getId() + " ha");
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Preserve the mock RM instances for secondaries
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Increase the attemptId and create a new intercepter instance for it
attemptId = ApplicationAttemptId.newInstance(
attemptId.getApplicationId(), attemptId.getAttemptId() + 1);
interceptor = new TestableFederationInterceptor(null, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registry));
registerResponse = interceptor.registerApplicationMaster(registerReq);
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(numberOfContainers,
registerResponse.getContainersFromPreviousAttempts().size());
// Release all containers
releaseContainersAndAssert(
registerResponse.getContainersFromPreviousAttempts());
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
return null;
}
});
}
}

View File

@ -44,6 +44,15 @@ public class TestableFederationInterceptor extends FederationInterceptor {
private AtomicInteger runningIndex = new AtomicInteger(0);
private MockResourceManagerFacade mockRm;
public TestableFederationInterceptor() {
}
public TestableFederationInterceptor(MockResourceManagerFacade homeRM,
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries) {
mockRm = homeRM;
secondaryResourceManagers = secondaries;
}
@Override
protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
ExecutorService threadPool) {
@ -68,7 +77,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
// We create one instance of the mock resource manager per sub cluster. Keep
// track of the instances of the RMs in the map keyed by the sub cluster id
synchronized (this.secondaryResourceManagers) {
if (this.secondaryResourceManagers.contains(subClusterId)) {
if (this.secondaryResourceManagers.containsKey(subClusterId)) {
return (T) this.secondaryResourceManagers.get(subClusterId);
} else {
// The running index here is used to simulate different RM_EPOCH to
@ -91,6 +100,15 @@ public class TestableFederationInterceptor extends FederationInterceptor {
}
}
protected MockResourceManagerFacade getHomeRM() {
return mockRm;
}
protected ConcurrentHashMap<String, MockResourceManagerFacade>
getSecondaryRMs() {
return secondaryResourceManagers;
}
/**
* Extends the UnmanagedAMPoolManager and overrides methods to provide a
* testable implementation of UnmanagedAMPoolManager.
@ -104,9 +122,9 @@ public class TestableFederationInterceptor extends FederationInterceptor {
@Override
public UnmanagedApplicationManager createUAM(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix) {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
return new TestableUnmanagedApplicationManager(conf, appId, queueName,
submitter, appNameSuffix);
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
}
}
@ -119,8 +137,9 @@ public class TestableFederationInterceptor extends FederationInterceptor {
public TestableUnmanagedApplicationManager(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix) {
super(conf, appId, queueName, submitter, appNameSuffix);
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts);
}
/**

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.service.AbstractService;
@ -951,9 +952,10 @@ public class MiniYARNCluster extends CompositeService {
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken,
Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
Map<String, byte[]> recoveredDataMap, boolean isRecovery,
Credentials credentials) {
super.initializePipeline(applicationAttemptId, user, amrmToken,
localToken, recoveredDataMap, isRecovery);
localToken, recoveredDataMap, isRecovery, credentials);
RequestInterceptor rt = getPipelines()
.get(applicationAttemptId.getApplicationId()).getRootInterceptor();
// The DefaultRequestInterceptor will generally be the last

View File

@ -141,6 +141,8 @@ The figure shows a sequence diagram for the following job execution flow:
b. The security tokens are also modified by the NM when launching the AM, so that the AM can only talk with the AMRMProxy. Any future communication from AM to the YARN RM is mediated by the AMRMProxy.
7. The AM will then request containers using the locality information exposed by HDFS.
8. Based on a policy the AMRMProxy can impersonate the AM on other sub-clusters, by submitting an Unmanaged AM, and by forwarding the AM heartbeats to relevant sub-clusters.
a. Federation supports multiple application attempts with AMRMProxy HA. AM containers will have different attempt id in home sub-cluster, but the same Unmanaged AM in secondaries will be used across attempts.
b. When AMRMProxy HA is enabled, UAM token will be stored in Yarn Registry. In the registerApplicationMaster call of each application attempt, AMRMProxy will go fetch existing UAM tokens from registry (if any) and re-attached to the existing UAMs.
9. The AMRMProxy will use both locality information and a pluggable policy configured in the state-store to decide whether to forward the resource requests received by the AM to the Home RM or to one (or more) Secondary RMs. In Figure 1, we show the case in which the AMRMProxy decides to forward the request to the secondary RM.
10. The secondary RM will provide the AMRMProxy with valid container tokens to start a new container on some node in its sub-cluster. This mechanism ensures that each sub-cluster uses its own security tokens and avoids the need for a cluster wide shared secret to create tokens.
11. The AMRMProxy forwards the allocation response back to the AM.
@ -262,16 +264,17 @@ These are extra configurations that should appear in the **conf/yarn-site.xml**
| Property | Example | Description |
|:---- |:---- |
| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled.
|`yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor.
| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. |
| `yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. |
| `yarn.client.failover-proxy-provider` | `org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider` | The class used to connect to the RMs by looking up the membership information in federation state-store. This must be set if federation is enabled, even if RM HA is not enabled.|
Optional:
| Property | Example | Description |
|:---- |:---- |
|`yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. |
|`yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. |
| `yarn.nodemanager.amrmproxy.ha.enable` | `true` | Whether or not the AMRMProxy HA is enabled for multiple application attempt suppport. |
| `yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. |
| `yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. |
Running a Sample Job
--------------------