YARN-11029. Refactor AMRMProxy Service code and Added Some Metrics. (#4650)

This commit is contained in:
slfan1989 2022-08-04 00:38:00 +08:00 committed by GitHub
parent c5eba323bc
commit 6463f86f83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 304 additions and 169 deletions

View File

@ -45,6 +45,16 @@ public final class AMRMProxyMetrics {
private MutableGaugeLong failedAllocateRequests; private MutableGaugeLong failedAllocateRequests;
@Metric("# of failed application recoveries") @Metric("# of failed application recoveries")
private MutableGaugeLong failedAppRecoveryCount; private MutableGaugeLong failedAppRecoveryCount;
@Metric("# of failed application stop")
private MutableGaugeLong failedAppStopRequests;
@Metric("# of failed update token")
private MutableGaugeLong failedUpdateAMRMTokenRequests;
@Metric("# all allocate requests count")
private MutableGaugeLong allocateCount;
@Metric("# all requests count")
private MutableGaugeLong requestCount;
// Aggregate metrics are shared, and don't have to be looked up per call // Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Application start request latency(ms)") @Metric("Application start request latency(ms)")
private MutableRate totalSucceededAppStartRequests; private MutableRate totalSucceededAppStartRequests;
@ -54,11 +64,22 @@ public final class AMRMProxyMetrics {
private MutableRate totalSucceededFinishAMRequests; private MutableRate totalSucceededFinishAMRequests;
@Metric("Allocate latency(ms)") @Metric("Allocate latency(ms)")
private MutableRate totalSucceededAllocateRequests; private MutableRate totalSucceededAllocateRequests;
@Metric("Application stop request latency(ms)")
private MutableRate totalSucceededAppStopRequests;
@Metric("Recover latency(ms)")
private MutableRate totalSucceededRecoverRequests;
@Metric("UpdateAMRMToken latency(ms)")
private MutableRate totalSucceededUpdateAMRMTokenRequests;
// Quantile latency in ms - this is needed for SLA (95%, 99%, etc) // Quantile latency in ms - this is needed for SLA (95%, 99%, etc)
private MutableQuantiles applicationStartLatency; private MutableQuantiles applicationStartLatency;
private MutableQuantiles registerAMLatency; private MutableQuantiles registerAMLatency;
private MutableQuantiles finishAMLatency; private MutableQuantiles finishAMLatency;
private MutableQuantiles allocateLatency; private MutableQuantiles allocateLatency;
private MutableQuantiles recoverLatency;
private MutableQuantiles applicationStopLatency;
private MutableQuantiles updateAMRMTokenLatency;
private static volatile AMRMProxyMetrics instance = null; private static volatile AMRMProxyMetrics instance = null;
private MetricsRegistry registry; private MetricsRegistry registry;
@ -78,6 +99,15 @@ public final class AMRMProxyMetrics {
allocateLatency = registry allocateLatency = registry
.newQuantiles("allocateLatency", "latency of allocate", "ops", .newQuantiles("allocateLatency", "latency of allocate", "ops",
"latency", 10); "latency", 10);
applicationStopLatency = registry
.newQuantiles("applicationStopLatency", "latency of app stop", "ops",
"latency", 10);
recoverLatency = registry
.newQuantiles("recoverLatency", "latency of recover", "ops",
"latency", 10);
updateAMRMTokenLatency = registry
.newQuantiles("updateAMRMTokenLatency", "latency of update amrm token", "ops",
"latency", 10);
} }
/** /**
@ -146,16 +176,57 @@ public final class AMRMProxyMetrics {
return totalSucceededAllocateRequests.lastStat().numSamples(); return totalSucceededAllocateRequests.lastStat().numSamples();
} }
@VisibleForTesting
long getNumSucceededAppStopRequests() {
return totalSucceededAppStopRequests.lastStat().numSamples();
}
@VisibleForTesting
long getNumSucceededRecoverRequests() {
return totalSucceededRecoverRequests.lastStat().numSamples();
}
@VisibleForTesting
long getNumSucceededUpdateAMRMTokenRequests() {
return totalSucceededUpdateAMRMTokenRequests.lastStat().numSamples();
}
@VisibleForTesting @VisibleForTesting
double getLatencySucceededAllocateRequests() { double getLatencySucceededAllocateRequests() {
return totalSucceededAllocateRequests.lastStat().mean(); return totalSucceededAllocateRequests.lastStat().mean();
} }
@VisibleForTesting
double getLatencySucceededAppStopRequests() {
return totalSucceededAppStopRequests.lastStat().mean();
}
@VisibleForTesting
double getLatencySucceededRecoverRequests() {
return totalSucceededRecoverRequests.lastStat().mean();
}
public void succeededAllocateRequests(long duration) { public void succeededAllocateRequests(long duration) {
totalSucceededAllocateRequests.add(duration); totalSucceededAllocateRequests.add(duration);
allocateLatency.add(duration); allocateLatency.add(duration);
} }
public void succeededAppStopRequests(long duration) {
totalSucceededAppStopRequests.add(duration);
applicationStopLatency.add(duration);
}
public void succeededRecoverRequests(long duration) {
totalSucceededRecoverRequests.add(duration);
recoverLatency.add(duration);
}
public void succeededUpdateTokenRequests(long duration) {
totalSucceededUpdateAMRMTokenRequests.add(duration);
updateAMRMTokenLatency.add(duration);
}
long getFailedAppStartRequests() { long getFailedAppStartRequests() {
return failedAppStartRequests.value(); return failedAppStartRequests.value();
} }
@ -195,4 +266,36 @@ public final class AMRMProxyMetrics {
public void incrFailedAppRecoveryCount() { public void incrFailedAppRecoveryCount() {
failedAppRecoveryCount.incr(); failedAppRecoveryCount.incr();
} }
long getFailedAppStopRequests() {
return failedAppStopRequests.value();
}
public void incrFailedAppStopRequests() {
failedAppStopRequests.incr();
}
long getFailedUpdateAMRMTokenRequests() {
return failedUpdateAMRMTokenRequests.value();
}
public void incrFailedUpdateAMRMTokenRequests() {
failedUpdateAMRMTokenRequests.incr();
}
public void incrAllocateCount() {
allocateCount.incr();
}
public void incrRequestCount() {
requestCount.incr();
}
long getAllocateCount() {
return allocateCount.value();
}
long getRequestCount() {
return requestCount.value();
}
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -123,12 +125,9 @@ public class AMRMProxyService extends CompositeService implements
Preconditions.checkArgument(dispatcher != null, "dispatcher is null"); Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
this.nmContext = nmContext; this.nmContext = nmContext;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.applPipelineMap = this.applPipelineMap = new ConcurrentHashMap<>();
new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
this.dispatcher.register(ApplicationEventType.class,
new ApplicationEventHandler());
this.dispatcher.register(ApplicationEventType.class, new ApplicationEventHandler());
metrics = AMRMProxyMetrics.getMetrics(); metrics = AMRMProxyMetrics.getMetrics();
} }
@ -155,7 +154,7 @@ public class AMRMProxyService extends CompositeService implements
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
LOG.info("Starting AMRMProxyService"); LOG.info("Starting AMRMProxyService.");
Configuration conf = getConfig(); Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
@ -182,27 +181,22 @@ public class AMRMProxyService extends CompositeService implements
listenerEndpoint, serverConf, this.secretManager, listenerEndpoint, serverConf, this.secretManager,
numWorkerThreads); numWorkerThreads);
if (conf if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
false)) {
this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
} }
this.server.start(); this.server.start();
LOG.info("AMRMProxyService listening on address: " LOG.info("AMRMProxyService listening on address: {}.", this.server.getListenerAddress());
+ this.server.getListenerAddress());
super.serviceStart(); super.serviceStart();
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
LOG.info("Stopping AMRMProxyService"); LOG.info("Stopping AMRMProxyService.");
if (this.server != null) { if (this.server != null) {
this.server.stop(); this.server.stop();
} }
this.secretManager.stop(); this.secretManager.stop();
super.serviceStop(); super.serviceStop();
} }
@ -212,19 +206,21 @@ public class AMRMProxyService extends CompositeService implements
* @throws IOException if recover fails * @throws IOException if recover fails
*/ */
public void recover() throws IOException { public void recover() throws IOException {
LOG.info("Recovering AMRMProxyService"); LOG.info("Recovering AMRMProxyService.");
RecoveredAMRMProxyState state = RecoveredAMRMProxyState state =
this.nmContext.getNMStateStore().loadAMRMProxyState(); this.nmContext.getNMStateStore().loadAMRMProxyState();
this.secretManager.recover(state); this.secretManager.recover(state);
LOG.info("Recovering {} running applications for AMRMProxy", LOG.info("Recovering {} running applications for AMRMProxy.",
state.getAppContexts().size()); state.getAppContexts().size());
for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
.getAppContexts().entrySet()) { .getAppContexts().entrySet()) {
ApplicationAttemptId attemptId = entry.getKey(); ApplicationAttemptId attemptId = entry.getKey();
LOG.info("Recovering app attempt {}", attemptId); LOG.info("Recovering app attempt {}.", attemptId);
long startTime = clock.getTime();
// Try recover for the running application attempt // Try recover for the running application attempt
try { try {
@ -233,19 +229,18 @@ public class AMRMProxyService extends CompositeService implements
for (Map.Entry<String, byte[]> contextEntry : entry.getValue() for (Map.Entry<String, byte[]> contextEntry : entry.getValue()
.entrySet()) { .entrySet()) {
if (contextEntry.getKey().equals(NMSS_USER_KEY)) { if (contextEntry.getKey().equals(NMSS_USER_KEY)) {
user = new String(contextEntry.getValue(), "UTF-8"); user = new String(contextEntry.getValue(), StandardCharsets.UTF_8);
} else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) { } else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) {
amrmToken = new Token<>(); amrmToken = new Token<>();
amrmToken.decodeFromUrlString( amrmToken.decodeFromUrlString(
new String(contextEntry.getValue(), "UTF-8")); new String(contextEntry.getValue(), StandardCharsets.UTF_8));
// Clear the service field, as if RM just issued the token // Clear the service field, as if RM just issued the token
amrmToken.setService(new Text()); amrmToken.setService(new Text());
} }
} }
if (amrmToken == null) { if (amrmToken == null) {
throw new IOException( throw new IOException("No amrmToken found for app attempt " + attemptId);
"No amrmToken found for app attempt " + attemptId);
} }
if (user == null) { if (user == null) {
throw new IOException("No user found for app attempt " + attemptId); throw new IOException("No user found for app attempt " + attemptId);
@ -258,14 +253,14 @@ public class AMRMProxyService extends CompositeService implements
// Retrieve the AM container credentials from NM context // Retrieve the AM container credentials from NM context
Credentials amCred = null; Credentials amCred = null;
for (Container container : this.nmContext.getContainers().values()) { for (Container container : this.nmContext.getContainers().values()) {
LOG.debug("From NM Context container {}", container.getContainerId()); LOG.debug("From NM Context container {}.", container.getContainerId());
if (container.getContainerId().getApplicationAttemptId().equals( if (container.getContainerId().getApplicationAttemptId().equals(
attemptId) && container.getContainerTokenIdentifier() != null) { attemptId) && container.getContainerTokenIdentifier() != null) {
LOG.debug("Container type {}", LOG.debug("Container type {}.",
container.getContainerTokenIdentifier().getContainerType()); container.getContainerTokenIdentifier().getContainerType());
if (container.getContainerTokenIdentifier() if (container.getContainerTokenIdentifier()
.getContainerType() == ContainerType.APPLICATION_MASTER) { .getContainerType() == ContainerType.APPLICATION_MASTER) {
LOG.info("AM container {} found in context, has credentials: {}", LOG.info("AM container {} found in context, has credentials: {}.",
container.getContainerId(), container.getContainerId(),
(container.getCredentials() != null)); (container.getCredentials() != null));
amCred = container.getCredentials(); amCred = container.getCredentials();
@ -274,15 +269,17 @@ public class AMRMProxyService extends CompositeService implements
} }
if (amCred == null) { if (amCred == null) {
LOG.error("No credentials found for AM container of {}. " LOG.error("No credentials found for AM container of {}. "
+ "Yarn registry access might not work", attemptId); + "Yarn registry access might not work.", attemptId);
} }
// Create the intercepter pipeline for the AM // Create the interceptor pipeline for the AM
initializePipeline(attemptId, user, amrmToken, localToken, initializePipeline(attemptId, user, amrmToken, localToken,
entry.getValue(), true, amCred); entry.getValue(), true, amCred);
long endTime = clock.getTime();
this.metrics.succeededRecoverRequests(endTime - startTime);
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("Exception when recovering " + attemptId LOG.error("Exception when recovering {}, removing it from NMStateStore and move on.",
+ ", removing it from NMStateStore and move on", e); attemptId, e);
this.metrics.incrFailedAppRecoveryCount(); this.metrics.incrFailedAppRecoveryCount();
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
} }
@ -292,26 +289,28 @@ public class AMRMProxyService extends CompositeService implements
/** /**
* This is called by the AMs started on this node to register with the RM. * This is called by the AMs started on this node to register with the RM.
* This method does the initial authorization and then forwards the request to * This method does the initial authorization and then forwards the request to
* the application instance specific intercepter chain. * the application instance specific interceptor chain.
*/ */
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException, RegisterApplicationMasterRequest request) throws YarnException,
IOException { IOException {
this.metrics.incrRequestCount();
long startTime = clock.getTime(); long startTime = clock.getTime();
try { try {
RequestInterceptorChainWrapper pipeline = RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain(); authorizeAndGetInterceptorChain();
LOG.info("Registering application master." + " Host:" + request.getHost()
+ " Port:" + request.getRpcPort() + " Tracking Url:" + request LOG.info("RegisteringAM Host: {}, Port: {}, Tracking Url: {} for application {}. ",
.getTrackingUrl() + " for application " + pipeline request.getHost(), request.getRpcPort(), request.getTrackingUrl(),
.getApplicationAttemptId()); pipeline.getApplicationAttemptId());
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
pipeline.getRootInterceptor().registerApplicationMaster(request); pipeline.getRootInterceptor().registerApplicationMaster(request);
long endTime = clock.getTime(); long endTime = clock.getTime();
this.metrics.succeededRegisterAMRequests(endTime - startTime); this.metrics.succeededRegisterAMRequests(endTime - startTime);
LOG.info("RegisterAM processing finished in {} ms for application {}", LOG.info("RegisterAM processing finished in {} ms for application {}.",
endTime - startTime, pipeline.getApplicationAttemptId()); endTime - startTime, pipeline.getApplicationAttemptId());
return response; return response;
} catch (Throwable t) { } catch (Throwable t) {
@ -323,24 +322,25 @@ public class AMRMProxyService extends CompositeService implements
/** /**
* This is called by the AMs started on this node to unregister from the RM. * This is called by the AMs started on this node to unregister from the RM.
* This method does the initial authorization and then forwards the request to * This method does the initial authorization and then forwards the request to
* the application instance specific intercepter chain. * the application instance specific interceptor chain.
*/ */
@Override @Override
public FinishApplicationMasterResponse finishApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException, FinishApplicationMasterRequest request) throws YarnException,
IOException { IOException {
this.metrics.incrRequestCount();
long startTime = clock.getTime(); long startTime = clock.getTime();
try { try {
RequestInterceptorChainWrapper pipeline = RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain(); authorizeAndGetInterceptorChain();
LOG.info("Finishing application master for {}. Tracking Url: {}", LOG.info("Finishing application master for {}. Tracking Url: {}.",
pipeline.getApplicationAttemptId(), request.getTrackingUrl()); pipeline.getApplicationAttemptId(), request.getTrackingUrl());
FinishApplicationMasterResponse response = FinishApplicationMasterResponse response =
pipeline.getRootInterceptor().finishApplicationMaster(request); pipeline.getRootInterceptor().finishApplicationMaster(request);
long endTime = clock.getTime(); long endTime = clock.getTime();
this.metrics.succeededFinishAMRequests(endTime - startTime); this.metrics.succeededFinishAMRequests(endTime - startTime);
LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}", LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}.",
response.getIsUnregistered(), endTime - startTime, response.getIsUnregistered(), endTime - startTime,
pipeline.getApplicationAttemptId()); pipeline.getApplicationAttemptId());
return response; return response;
@ -354,12 +354,13 @@ public class AMRMProxyService extends CompositeService implements
* This is called by the AMs started on this node to send heart beat to RM. * This is called by the AMs started on this node to send heart beat to RM.
* This method does the initial authorization and then forwards the request to * This method does the initial authorization and then forwards the request to
* the application instance specific pipeline, which is a chain of request * the application instance specific pipeline, which is a chain of request
* intercepter objects. One application request processing pipeline is created * interceptor objects. One application request processing pipeline is created
* per AM instance. * per AM instance.
*/ */
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException { throws YarnException, IOException {
this.metrics.incrAllocateCount();
long startTime = clock.getTime(); long startTime = clock.getTime();
try { try {
AMRMTokenIdentifier amrmTokenIdentifier = AMRMTokenIdentifier amrmTokenIdentifier =
@ -373,7 +374,7 @@ public class AMRMProxyService extends CompositeService implements
long endTime = clock.getTime(); long endTime = clock.getTime();
this.metrics.succeededAllocateRequests(endTime - startTime); this.metrics.succeededAllocateRequests(endTime - startTime);
LOG.info("Allocate processing finished in {} ms for application {}", LOG.info("Allocate processing finished in {} ms for application {}.",
endTime - startTime, pipeline.getApplicationAttemptId()); endTime - startTime, pipeline.getApplicationAttemptId());
return allocateResponse; return allocateResponse;
} catch (Throwable t) { } catch (Throwable t) {
@ -392,6 +393,7 @@ public class AMRMProxyService extends CompositeService implements
*/ */
public void processApplicationStartRequest(StartContainerRequest request) public void processApplicationStartRequest(StartContainerRequest request)
throws IOException, YarnException { throws IOException, YarnException {
this.metrics.incrRequestCount();
long startTime = clock.getTime(); long startTime = clock.getTime();
try { try {
ContainerTokenIdentifier containerTokenIdentifierForKey = ContainerTokenIdentifier containerTokenIdentifierForKey =
@ -408,8 +410,7 @@ public class AMRMProxyService extends CompositeService implements
if (!checkIfAppExistsInStateStore(applicationID)) { if (!checkIfAppExistsInStateStore(applicationID)) {
return; return;
} }
LOG.info("Callback received for initializing request " LOG.info("Callback received for initializing request processing pipeline for an AM.");
+ "processing pipeline for an AM");
Credentials credentials = YarnServerSecurityUtils Credentials credentials = YarnServerSecurityUtils
.parseCredentials(request.getContainerLaunchContext()); .parseCredentials(request.getContainerLaunchContext());
@ -417,8 +418,7 @@ public class AMRMProxyService extends CompositeService implements
getFirstAMRMToken(credentials.getAllTokens()); getFirstAMRMToken(credentials.getAllTokens());
if (amrmToken == null) { if (amrmToken == null) {
throw new YarnRuntimeException( throw new YarnRuntimeException(
"AMRMToken not found in the start container request for " "AMRMToken not found in the start container request for application:" + appAttemptId);
+ "application:" + appAttemptId.toString());
} }
// Substitute the existing AMRM Token with a local one. Keep the rest of // Substitute the existing AMRM Token with a local one. Keep the rest of
@ -445,7 +445,7 @@ public class AMRMProxyService extends CompositeService implements
} }
/** /**
* Initializes the request intercepter pipeline for the specified application. * Initializes the request interceptor pipeline for the specified application.
* *
* @param applicationAttemptId attempt id * @param applicationAttemptId attempt id
* @param user user name * @param user user name
@ -465,8 +465,7 @@ public class AMRMProxyService extends CompositeService implements
.containsKey(applicationAttemptId.getApplicationId())) { .containsKey(applicationAttemptId.getApplicationId())) {
LOG.warn("Request to start an already existing appId was received. " LOG.warn("Request to start an already existing appId was received. "
+ " This can happen if an application failed and a new attempt " + " This can happen if an application failed and a new attempt "
+ "was created on this machine. ApplicationId: " + "was created on this machine. ApplicationId: {}.", applicationAttemptId);
+ applicationAttemptId.toString());
RequestInterceptorChainWrapper chainWrapperBackup = RequestInterceptorChainWrapper chainWrapperBackup =
this.applPipelineMap.get(applicationAttemptId.getApplicationId()); this.applPipelineMap.get(applicationAttemptId.getApplicationId());
@ -476,8 +475,7 @@ public class AMRMProxyService extends CompositeService implements
.equals(applicationAttemptId)) { .equals(applicationAttemptId)) {
// TODO: revisit in AMRMProxy HA in YARN-6128 // TODO: revisit in AMRMProxy HA in YARN-6128
// Remove the existing pipeline // Remove the existing pipeline
LOG.info("Remove the previous pipeline for ApplicationId: " LOG.info("Remove the previous pipeline for ApplicationId: {}.", applicationAttemptId);
+ applicationAttemptId.toString());
RequestInterceptorChainWrapper pipeline = RequestInterceptorChainWrapper pipeline =
applPipelineMap.remove(applicationAttemptId.getApplicationId()); applPipelineMap.remove(applicationAttemptId.getApplicationId());
@ -485,19 +483,17 @@ public class AMRMProxyService extends CompositeService implements
try { try {
this.nmContext.getNMStateStore() this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(applicationAttemptId); .removeAMRMProxyAppContext(applicationAttemptId);
} catch (IOException e) { } catch (IOException ioe) {
LOG.error("Error removing AMRMProxy application context for " LOG.error("Error removing AMRMProxy application context for {}.",
+ applicationAttemptId, e); applicationAttemptId, ioe);
} }
} }
try { try {
pipeline.getRootInterceptor().shutdown(); pipeline.getRootInterceptor().shutdown();
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.warn( LOG.warn("Failed to shutdown the request processing pipeline for app: {}.",
"Failed to shutdown the request processing pipeline for app:" applicationAttemptId.getApplicationId(), ex);
+ applicationAttemptId.getApplicationId(),
ex);
} }
} else { } else {
return; return;
@ -510,12 +506,11 @@ public class AMRMProxyService extends CompositeService implements
} }
// We register the pipeline instance in the map first and then initialize it // We register the pipeline instance in the map first and then initialize it
// later because chain initialization can be expensive and we would like to // later because chain initialization can be expensive, and we would like to
// release the lock as soon as possible to prevent other applications from // release the lock as soon as possible to prevent other applications from
// blocking when one application's chain is initializing // blocking when one application's chain is initializing
LOG.info("Initializing request processing pipeline for application. " LOG.info("Initializing request processing pipeline for application. "
+ " ApplicationId:" + applicationAttemptId + " for the user: " + " ApplicationId: {} for the user: {}.", applicationAttemptId, user);
+ user);
try { try {
RequestInterceptor interceptorChain = RequestInterceptor interceptorChain =
@ -525,8 +520,7 @@ public class AMRMProxyService extends CompositeService implements
user, amrmToken, localToken, credentials, this.registry)); user, amrmToken, localToken, credentials, this.registry));
if (isRecovery) { if (isRecovery) {
if (recoveredDataMap == null) { if (recoveredDataMap == null) {
throw new YarnRuntimeException( throw new YarnRuntimeException("null recoveredDataMap received for recover");
"null recoveredDataMap recieved for recover");
} }
interceptorChain.recover(recoveredDataMap); interceptorChain.recover(recoveredDataMap);
} }
@ -535,13 +529,13 @@ public class AMRMProxyService extends CompositeService implements
if (!isRecovery && this.nmContext.getNMStateStore() != null) { if (!isRecovery && this.nmContext.getNMStateStore() != null) {
try { try {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8")); applicationAttemptId, NMSS_USER_KEY, user.getBytes(StandardCharsets.UTF_8));
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
applicationAttemptId, NMSS_AMRMTOKEN_KEY, applicationAttemptId, NMSS_AMRMTOKEN_KEY,
amrmToken.encodeToUrlString().getBytes("UTF-8")); amrmToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error storing AMRMProxy application context entry for " LOG.error("Error storing AMRMProxy application context entry for {}.",
+ applicationAttemptId, e); applicationAttemptId, e);
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -557,29 +551,27 @@ public class AMRMProxyService extends CompositeService implements
* @param applicationId application id * @param applicationId application id
*/ */
protected void stopApplication(ApplicationId applicationId) { protected void stopApplication(ApplicationId applicationId) {
Preconditions.checkArgument(applicationId != null, this.metrics.incrRequestCount();
"applicationId is null"); Preconditions.checkArgument(applicationId != null, "applicationId is null");
RequestInterceptorChainWrapper pipeline = RequestInterceptorChainWrapper pipeline =
this.applPipelineMap.remove(applicationId); this.applPipelineMap.remove(applicationId);
boolean isStopSuccess = true;
long startTime = clock.getTime();
if (pipeline == null) { if (pipeline == null) {
LOG.info( LOG.info("No interceptor pipeline for application {},"
"No interceptor pipeline for application {}," + " likely because its AM is not run in this node.", applicationId);
+ " likely because its AM is not run in this node.", isStopSuccess = false;
applicationId);
} else { } else {
// Remove the appAttempt in AMRMTokenSecretManager // Remove the appAttempt in AMRMTokenSecretManager
this.secretManager this.secretManager.applicationMasterFinished(pipeline.getApplicationAttemptId());
.applicationMasterFinished(pipeline.getApplicationAttemptId()); LOG.info("Stopping the request processing pipeline for application: {}.", applicationId);
LOG.info("Stopping the request processing pipeline for application: "
+ applicationId);
try { try {
pipeline.getRootInterceptor().shutdown(); pipeline.getRootInterceptor().shutdown();
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.warn( LOG.warn("Failed to shutdown the request processing pipeline for app: {}.",
"Failed to shutdown the request processing pipeline for app:" applicationId, ex);
+ applicationId, ex); isStopSuccess = false;
} }
// Remove the app context from NMSS after the interceptors are shutdown // Remove the app context from NMSS after the interceptors are shutdown
@ -588,74 +580,83 @@ public class AMRMProxyService extends CompositeService implements
this.nmContext.getNMStateStore() this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(pipeline.getApplicationAttemptId()); .removeAMRMProxyAppContext(pipeline.getApplicationAttemptId());
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error removing AMRMProxy application context for " LOG.error("Error removing AMRMProxy application context for {}.",
+ applicationId, e); applicationId, e);
isStopSuccess = false;
} }
} }
} }
if (isStopSuccess) {
long endTime = clock.getTime();
this.metrics.succeededAppStopRequests(endTime - startTime);
} else {
this.metrics.incrFailedAppStopRequests();
}
} }
private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
RequestInterceptorChainWrapper pipeline, RequestInterceptorChainWrapper pipeline,
AllocateResponse allocateResponse) { AllocateResponse allocateResponse) {
AMRMProxyApplicationContextImpl context = AMRMProxyApplicationContextImpl context =
(AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor() (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor().getApplicationContext();
.getApplicationContext();
// check to see if the RM has issued a new AMRMToken & accordingly update try {
// the real ARMRMToken in the current context long startTime = clock.getTime();
if (allocateResponse.getAMRMToken() != null) {
LOG.info("RM rolled master-key for amrm-tokens");
org.apache.hadoop.yarn.api.records.Token token = // check to see if the RM has issued a new AMRMToken & accordingly update
allocateResponse.getAMRMToken(); // the real ARMRMToken in the current context
if (allocateResponse.getAMRMToken() != null) {
LOG.info("RM rolled master-key for amrm-tokens.");
// Do not propagate this info back to AM org.apache.hadoop.yarn.api.records.Token token = allocateResponse.getAMRMToken();
allocateResponse.setAMRMToken(null);
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken = // Do not propagate this info back to AM
ConverterUtils.convertFromYarn(token, (Text) null); allocateResponse.setAMRMToken(null);
// Update the AMRMToken in context map, and in NM state store if it is org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
// different ConverterUtils.convertFromYarn(token, (Text) null);
if (context.setAMRMToken(newToken)
&& this.nmContext.getNMStateStore() != null) { // Update the AMRMToken in context map, and in NM state store if it is
try { // different
if (context.setAMRMToken(newToken) && this.nmContext.getNMStateStore() != null) {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
newToken.encodeToUrlString().getBytes("UTF-8")); newToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ context.getApplicationAttemptId(), e);
} }
} }
}
// Check if the local AMRMToken is rolled up and update the context and // Check if the local AMRMToken is rolled up and update the context and
// response accordingly // response accordingly
MasterKeyData nextMasterKey = MasterKeyData nextMasterKey = this.secretManager.getNextMasterKeyData();
this.secretManager.getNextMasterKeyData();
if (nextMasterKey != null if (nextMasterKey != null) {
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier MasterKey masterKey = nextMasterKey.getMasterKey();
.getKeyId()) { if (masterKey.getKeyId() != amrmTokenIdentifier.getKeyId()) {
Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken(); Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
if (nextMasterKey.getMasterKey().getKeyId() != context if (masterKey.getKeyId() != context.getLocalAMRMTokenKeyId()) {
.getLocalAMRMTokenKeyId()) { LOG.info("The local AMRMToken has been rolled-over."
LOG.info("The local AMRMToken has been rolled-over." + " Send new local AMRMToken back to application: {}",
+ " Send new local AMRMToken back to application: " pipeline.getApplicationId());
+ pipeline.getApplicationId()); localToken = this.secretManager.createAndGetAMRMToken(
localToken = pipeline.getApplicationAttemptId());
this.secretManager.createAndGetAMRMToken(pipeline context.setLocalAMRMToken(localToken);
.getApplicationAttemptId()); }
context.setLocalAMRMToken(localToken);
allocateResponse
.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
.newInstance(localToken.getIdentifier(), localToken
.getKind().toString(), localToken.getPassword(),
localToken.getService().toString()));
}
} }
long endTime = clock.getTime();
allocateResponse this.metrics.succeededUpdateTokenRequests(endTime - startTime);
.setAMRMToken(org.apache.hadoop.yarn.api.records.Token } catch (IOException e) {
.newInstance(localToken.getIdentifier(), localToken LOG.error("Error storing AMRMProxy application context entry for {}.",
.getKind().toString(), localToken.getPassword(), context.getApplicationAttemptId(), e);
localToken.getService().toString())); this.metrics.incrFailedUpdateAMRMTokenRequests();
} }
} }
@ -672,19 +673,19 @@ public class AMRMProxyService extends CompositeService implements
} }
/** /**
* Gets the Request intercepter chains for all the applications. * Gets the Request interceptor chains for all the applications.
* *
* @return the request intercepter chains. * @return the request interceptor chains.
*/ */
protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() { protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
return this.applPipelineMap; return this.applPipelineMap;
} }
/** /**
* This method creates and returns reference of the first intercepter in the * This method creates and returns reference of the first interceptor in the
* chain of request intercepter instances. * chain of request interceptor instances.
* *
* @return the reference of the first intercepter in the chain * @return the reference of the first interceptor in the chain
*/ */
protected RequestInterceptor createRequestInterceptorChain() { protected RequestInterceptor createRequestInterceptorChain() {
Configuration conf = getConfig(); Configuration conf = getConfig();
@ -717,7 +718,7 @@ public class AMRMProxyService extends CompositeService implements
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new YarnRuntimeException( throw new YarnRuntimeException(
"Could not instantiate ApplicationMasterRequestInterceptor: " "Could not instantiate ApplicationMasterRequestInterceptor: "
+ interceptorClassName, e); + interceptorClassName, e);
} }
} }
@ -729,10 +730,10 @@ public class AMRMProxyService extends CompositeService implements
} }
/** /**
* Returns the comma separated intercepter class names from the configuration. * Returns the comma separated interceptor class names from the configuration.
* *
* @param conf configuration * @param conf configuration
* @return the intercepter class names as an instance of ArrayList * @return the interceptor class names as an instance of ArrayList
*/ */
private List<String> getInterceptorClassNames(Configuration conf) { private List<String> getInterceptorClassNames(Configuration conf) {
String configuredInterceptorClassNames = String configuredInterceptorClassNames =
@ -759,7 +760,7 @@ public class AMRMProxyService extends CompositeService implements
* Authorizes the request and returns the application specific request * Authorizes the request and returns the application specific request
* processing pipeline. * processing pipeline.
* *
* @return the the intercepter wrapper instance * @return the interceptor wrapper instance
* @throws YarnException if fails * @throws YarnException if fails
*/ */
private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain() private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
@ -775,11 +776,10 @@ public class AMRMProxyService extends CompositeService implements
tokenIdentifier.getApplicationAttemptId(); tokenIdentifier.getApplicationAttemptId();
synchronized (this.applPipelineMap) { synchronized (this.applPipelineMap) {
if (!this.applPipelineMap.containsKey(appAttemptId if (!this.applPipelineMap.containsKey(appAttemptId.getApplicationId())) {
.getApplicationId())) {
throw new YarnException( throw new YarnException(
"The AM request processing pipeline is not initialized for app: " "The AM request processing pipeline is not initialized for app: "
+ appAttemptId.getApplicationId().toString()); + appAttemptId.getApplicationId());
} }
return this.applPipelineMap.get(appAttemptId.getApplicationId()); return this.applPipelineMap.get(appAttemptId.getApplicationId());
@ -827,29 +827,26 @@ public class AMRMProxyService extends CompositeService implements
/** /**
* Private class for handling application stop events. * Private class for handling application stop events.
*
*/ */
class ApplicationEventHandler implements EventHandler<ApplicationEvent> { class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
@Override @Override
public void handle(ApplicationEvent event) { public void handle(ApplicationEvent event) {
Application app = Application app =
AMRMProxyService.this.nmContext.getApplications().get( AMRMProxyService.this.nmContext.getApplications().get(event.getApplicationID());
event.getApplicationID());
if (app != null) { if (app != null) {
switch (event.getType()) { switch (event.getType()) {
case APPLICATION_RESOURCES_CLEANEDUP: case APPLICATION_RESOURCES_CLEANEDUP:
LOG.info("Application stop event received for stopping AppId:" LOG.info("Application stop event received for stopping AppId: {}.",
+ event.getApplicationID().toString()); event.getApplicationID().toString());
AMRMProxyService.this.stopApplication(event.getApplicationID()); AMRMProxyService.this.stopApplication(event.getApplicationID());
break; break;
default: default:
LOG.debug("AMRMProxy is ignoring event: {}", event.getType()); LOG.debug("AMRMProxy is ignoring event: {}.", event.getType());
break; break;
} }
} else { } else {
LOG.warn("Event " + event + " sent to absent application " LOG.warn("Event {} sent to absent application {}.", event, event.getApplicationID());
+ event.getApplicationID());
} }
} }
} }
@ -866,20 +863,20 @@ public class AMRMProxyService extends CompositeService implements
/** /**
* Initializes the wrapper with the specified parameters. * Initializes the wrapper with the specified parameters.
* *
* @param rootInterceptor the root request intercepter * @param interceptor the root request interceptor
* @param applicationAttemptId attempt id * @param appAttemptId attempt id
*/ */
public synchronized void init(RequestInterceptor rootInterceptor, public synchronized void init(RequestInterceptor interceptor,
ApplicationAttemptId applicationAttemptId) { ApplicationAttemptId appAttemptId) {
this.rootInterceptor = rootInterceptor; rootInterceptor = interceptor;
this.applicationAttemptId = applicationAttemptId; applicationAttemptId = appAttemptId;
} }
/** /**
* Gets the root request intercepter. * Gets the root request interceptor.
* *
* @return the root request intercepter * @return the root request interceptor
*/ */
public synchronized RequestInterceptor getRootInterceptor() { public synchronized RequestInterceptor getRootInterceptor() {
return rootInterceptor; return rootInterceptor;

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.nodemanager.amrmproxy contains
* classes for handling federation amrm information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -42,11 +42,19 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
Assert.assertEquals(0, metrics.getFailedRegisterAMRequests()); Assert.assertEquals(0, metrics.getFailedRegisterAMRequests());
Assert.assertEquals(0, metrics.getFailedFinishAMRequests()); Assert.assertEquals(0, metrics.getFailedFinishAMRequests());
Assert.assertEquals(0, metrics.getFailedAllocateRequests()); Assert.assertEquals(0, metrics.getFailedAllocateRequests());
Assert.assertEquals(0, metrics.getFailedAppRecoveryCount());
Assert.assertEquals(0, metrics.getFailedAppStopRequests());
Assert.assertEquals(0, metrics.getFailedUpdateAMRMTokenRequests());
Assert.assertEquals(0, metrics.getAllocateCount());
Assert.assertEquals(0, metrics.getRequestCount());
Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests()); Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests());
Assert.assertEquals(0, metrics.getNumSucceededRegisterAMRequests()); Assert.assertEquals(0, metrics.getNumSucceededRegisterAMRequests());
Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests()); Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests());
Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests()); Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests());
Assert.assertEquals(0, metrics.getNumSucceededRecoverRequests());
Assert.assertEquals(0, metrics.getNumSucceededAppStopRequests());
Assert.assertEquals(0, metrics.getNumSucceededUpdateAMRMTokenRequests());
LOG.info("Test: aggregate metrics are updated correctly"); LOG.info("Test: aggregate metrics are updated correctly");
} }
@ -57,19 +65,19 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
long failedRegisterAMRequests = metrics.getFailedRegisterAMRequests(); long failedRegisterAMRequests = metrics.getFailedRegisterAMRequests();
long failedFinishAMRequests = metrics.getFailedFinishAMRequests(); long failedFinishAMRequests = metrics.getFailedFinishAMRequests();
long failedAllocateRequests = metrics.getFailedAllocateRequests(); long failedAllocateRequests = metrics.getFailedAllocateRequests();
long failedAppRecoveryRequests = metrics.getFailedAppRecoveryCount();
long failedAppStopRequests = metrics.getFailedAppStopRequests();
long failedUpdateAMRMTokenRequests = metrics.getFailedUpdateAMRMTokenRequests();
long succeededAppStartRequests = metrics.getNumSucceededAppStartRequests(); long succeededAppStartRequests = metrics.getNumSucceededAppStartRequests();
long succeededRegisterAMRequests = long succeededRegisterAMRequests = metrics.getNumSucceededRegisterAMRequests();
metrics.getNumSucceededRegisterAMRequests();
long succeededFinishAMRequests = metrics.getNumSucceededFinishAMRequests(); long succeededFinishAMRequests = metrics.getNumSucceededFinishAMRequests();
long succeededAllocateRequests = metrics.getNumSucceededAllocateRequests(); long succeededAllocateRequests = metrics.getNumSucceededAllocateRequests();
int testAppId = 1; int testAppId = 1;
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse = registerApplicationMaster(testAppId);
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse); Assert.assertNotNull(registerResponse);
Assert Assert.assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
.assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
AllocateResponse allocateResponse = allocate(testAppId); AllocateResponse allocateResponse = allocate(testAppId);
Assert.assertNotNull(allocateResponse); Assert.assertNotNull(allocateResponse);
@ -80,14 +88,13 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
Assert.assertNotNull(finshResponse); Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered()); Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertEquals(failedAppStartRequests, Assert.assertEquals(failedAppStartRequests, metrics.getFailedAppStartRequests());
metrics.getFailedAppStartRequests()); Assert.assertEquals(failedRegisterAMRequests, metrics.getFailedRegisterAMRequests());
Assert.assertEquals(failedRegisterAMRequests, Assert.assertEquals(failedFinishAMRequests, metrics.getFailedFinishAMRequests());
metrics.getFailedRegisterAMRequests()); Assert.assertEquals(failedAllocateRequests, metrics.getFailedAllocateRequests());
Assert.assertEquals(failedFinishAMRequests, Assert.assertEquals(failedAppRecoveryRequests, metrics.getFailedAppRecoveryCount());
metrics.getFailedFinishAMRequests()); Assert.assertEquals(failedAppStopRequests, metrics.getFailedAppStopRequests());
Assert.assertEquals(failedAllocateRequests, Assert.assertEquals(failedUpdateAMRMTokenRequests, metrics.getFailedUpdateAMRMTokenRequests());
metrics.getFailedAllocateRequests());
Assert.assertEquals(succeededAppStartRequests, Assert.assertEquals(succeededAppStartRequests,
metrics.getNumSucceededAppStartRequests()); metrics.getNumSucceededAppStartRequests());