diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java index 241eeeb62a8..f5a30828a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java @@ -45,6 +45,16 @@ public final class AMRMProxyMetrics { private MutableGaugeLong failedAllocateRequests; @Metric("# of failed application recoveries") private MutableGaugeLong failedAppRecoveryCount; + @Metric("# of failed application stop") + private MutableGaugeLong failedAppStopRequests; + @Metric("# of failed update token") + private MutableGaugeLong failedUpdateAMRMTokenRequests; + @Metric("# all allocate requests count") + private MutableGaugeLong allocateCount; + @Metric("# all requests count") + private MutableGaugeLong requestCount; + + // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Application start request latency(ms)") private MutableRate totalSucceededAppStartRequests; @@ -54,11 +64,22 @@ public final class AMRMProxyMetrics { private MutableRate totalSucceededFinishAMRequests; @Metric("Allocate latency(ms)") private MutableRate totalSucceededAllocateRequests; + @Metric("Application stop request latency(ms)") + private MutableRate totalSucceededAppStopRequests; + @Metric("Recover latency(ms)") + private MutableRate totalSucceededRecoverRequests; + @Metric("UpdateAMRMToken latency(ms)") + private MutableRate totalSucceededUpdateAMRMTokenRequests; + // Quantile latency in ms - this is needed for SLA (95%, 99%, etc) private MutableQuantiles applicationStartLatency; private MutableQuantiles registerAMLatency; private MutableQuantiles finishAMLatency; private MutableQuantiles allocateLatency; + private MutableQuantiles recoverLatency; + private MutableQuantiles applicationStopLatency; + private MutableQuantiles updateAMRMTokenLatency; + private static volatile AMRMProxyMetrics instance = null; private MetricsRegistry registry; @@ -78,6 +99,15 @@ public final class AMRMProxyMetrics { allocateLatency = registry .newQuantiles("allocateLatency", "latency of allocate", "ops", "latency", 10); + applicationStopLatency = registry + .newQuantiles("applicationStopLatency", "latency of app stop", "ops", + "latency", 10); + recoverLatency = registry + .newQuantiles("recoverLatency", "latency of recover", "ops", + "latency", 10); + updateAMRMTokenLatency = registry + .newQuantiles("updateAMRMTokenLatency", "latency of update amrm token", "ops", + "latency", 10); } /** @@ -146,16 +176,57 @@ public final class AMRMProxyMetrics { return totalSucceededAllocateRequests.lastStat().numSamples(); } + @VisibleForTesting + long getNumSucceededAppStopRequests() { + return totalSucceededAppStopRequests.lastStat().numSamples(); + } + + @VisibleForTesting + long getNumSucceededRecoverRequests() { + return totalSucceededRecoverRequests.lastStat().numSamples(); + } + + @VisibleForTesting + long getNumSucceededUpdateAMRMTokenRequests() { + return totalSucceededUpdateAMRMTokenRequests.lastStat().numSamples(); + } + + @VisibleForTesting double getLatencySucceededAllocateRequests() { return totalSucceededAllocateRequests.lastStat().mean(); } + @VisibleForTesting + double getLatencySucceededAppStopRequests() { + return totalSucceededAppStopRequests.lastStat().mean(); + } + + @VisibleForTesting + double getLatencySucceededRecoverRequests() { + return totalSucceededRecoverRequests.lastStat().mean(); + } + public void succeededAllocateRequests(long duration) { totalSucceededAllocateRequests.add(duration); allocateLatency.add(duration); } + public void succeededAppStopRequests(long duration) { + totalSucceededAppStopRequests.add(duration); + applicationStopLatency.add(duration); + } + + public void succeededRecoverRequests(long duration) { + totalSucceededRecoverRequests.add(duration); + recoverLatency.add(duration); + } + + public void succeededUpdateTokenRequests(long duration) { + totalSucceededUpdateAMRMTokenRequests.add(duration); + updateAMRMTokenLatency.add(duration); + } + long getFailedAppStartRequests() { return failedAppStartRequests.value(); } @@ -195,4 +266,36 @@ public final class AMRMProxyMetrics { public void incrFailedAppRecoveryCount() { failedAppRecoveryCount.incr(); } + + long getFailedAppStopRequests() { + return failedAppStopRequests.value(); + } + + public void incrFailedAppStopRequests() { + failedAppStopRequests.incr(); + } + + long getFailedUpdateAMRMTokenRequests() { + return failedUpdateAMRMTokenRequests.value(); + } + + public void incrFailedUpdateAMRMTokenRequests() { + failedUpdateAMRMTokenRequests.incr(); + } + + public void incrAllocateCount() { + allocateCount.incr(); + } + + public void incrRequestCount() { + requestCount.incr(); + } + + long getAllocateCount() { + return allocateCount.value(); + } + + long getRequestCount() { + return requestCount.value(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index b0d66ca027d..82873e07cdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -123,12 +125,9 @@ public class AMRMProxyService extends CompositeService implements Preconditions.checkArgument(dispatcher != null, "dispatcher is null"); this.nmContext = nmContext; this.dispatcher = dispatcher; - this.applPipelineMap = - new ConcurrentHashMap(); - - this.dispatcher.register(ApplicationEventType.class, - new ApplicationEventHandler()); + this.applPipelineMap = new ConcurrentHashMap<>(); + this.dispatcher.register(ApplicationEventType.class, new ApplicationEventHandler()); metrics = AMRMProxyMetrics.getMetrics(); } @@ -155,7 +154,7 @@ public class AMRMProxyService extends CompositeService implements @Override protected void serviceStart() throws Exception { - LOG.info("Starting AMRMProxyService"); + LOG.info("Starting AMRMProxyService."); Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); UserGroupInformation.setConfiguration(conf); @@ -182,27 +181,22 @@ public class AMRMProxyService extends CompositeService implements listenerEndpoint, serverConf, this.secretManager, numWorkerThreads); - if (conf - .getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, - false)) { - this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance()); + if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { + this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance()); } this.server.start(); - LOG.info("AMRMProxyService listening on address: " - + this.server.getListenerAddress()); + LOG.info("AMRMProxyService listening on address: {}.", this.server.getListenerAddress()); super.serviceStart(); } @Override protected void serviceStop() throws Exception { - LOG.info("Stopping AMRMProxyService"); + LOG.info("Stopping AMRMProxyService."); if (this.server != null) { this.server.stop(); } - this.secretManager.stop(); - super.serviceStop(); } @@ -212,19 +206,21 @@ public class AMRMProxyService extends CompositeService implements * @throws IOException if recover fails */ public void recover() throws IOException { - LOG.info("Recovering AMRMProxyService"); + LOG.info("Recovering AMRMProxyService."); RecoveredAMRMProxyState state = this.nmContext.getNMStateStore().loadAMRMProxyState(); this.secretManager.recover(state); - LOG.info("Recovering {} running applications for AMRMProxy", + LOG.info("Recovering {} running applications for AMRMProxy.", state.getAppContexts().size()); + for (Map.Entry> entry : state .getAppContexts().entrySet()) { ApplicationAttemptId attemptId = entry.getKey(); - LOG.info("Recovering app attempt {}", attemptId); + LOG.info("Recovering app attempt {}.", attemptId); + long startTime = clock.getTime(); // Try recover for the running application attempt try { @@ -233,19 +229,18 @@ public class AMRMProxyService extends CompositeService implements for (Map.Entry contextEntry : entry.getValue() .entrySet()) { if (contextEntry.getKey().equals(NMSS_USER_KEY)) { - user = new String(contextEntry.getValue(), "UTF-8"); + user = new String(contextEntry.getValue(), StandardCharsets.UTF_8); } else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) { amrmToken = new Token<>(); amrmToken.decodeFromUrlString( - new String(contextEntry.getValue(), "UTF-8")); + new String(contextEntry.getValue(), StandardCharsets.UTF_8)); // Clear the service field, as if RM just issued the token amrmToken.setService(new Text()); } } if (amrmToken == null) { - throw new IOException( - "No amrmToken found for app attempt " + attemptId); + throw new IOException("No amrmToken found for app attempt " + attemptId); } if (user == null) { throw new IOException("No user found for app attempt " + attemptId); @@ -258,14 +253,14 @@ public class AMRMProxyService extends CompositeService implements // Retrieve the AM container credentials from NM context Credentials amCred = null; for (Container container : this.nmContext.getContainers().values()) { - LOG.debug("From NM Context container {}", container.getContainerId()); + LOG.debug("From NM Context container {}.", container.getContainerId()); if (container.getContainerId().getApplicationAttemptId().equals( attemptId) && container.getContainerTokenIdentifier() != null) { - LOG.debug("Container type {}", + LOG.debug("Container type {}.", container.getContainerTokenIdentifier().getContainerType()); if (container.getContainerTokenIdentifier() .getContainerType() == ContainerType.APPLICATION_MASTER) { - LOG.info("AM container {} found in context, has credentials: {}", + LOG.info("AM container {} found in context, has credentials: {}.", container.getContainerId(), (container.getCredentials() != null)); amCred = container.getCredentials(); @@ -274,15 +269,17 @@ public class AMRMProxyService extends CompositeService implements } if (amCred == null) { LOG.error("No credentials found for AM container of {}. " - + "Yarn registry access might not work", attemptId); + + "Yarn registry access might not work.", attemptId); } - // Create the intercepter pipeline for the AM + // Create the interceptor pipeline for the AM initializePipeline(attemptId, user, amrmToken, localToken, entry.getValue(), true, amCred); + long endTime = clock.getTime(); + this.metrics.succeededRecoverRequests(endTime - startTime); } catch (Throwable e) { - LOG.error("Exception when recovering " + attemptId - + ", removing it from NMStateStore and move on", e); + LOG.error("Exception when recovering {}, removing it from NMStateStore and move on.", + attemptId, e); this.metrics.incrFailedAppRecoveryCount(); this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); } @@ -292,26 +289,28 @@ public class AMRMProxyService extends CompositeService implements /** * This is called by the AMs started on this node to register with the RM. * This method does the initial authorization and then forwards the request to - * the application instance specific intercepter chain. + * the application instance specific interceptor chain. */ @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { + this.metrics.incrRequestCount(); long startTime = clock.getTime(); try { RequestInterceptorChainWrapper pipeline = authorizeAndGetInterceptorChain(); - LOG.info("Registering application master." + " Host:" + request.getHost() - + " Port:" + request.getRpcPort() + " Tracking Url:" + request - .getTrackingUrl() + " for application " + pipeline - .getApplicationAttemptId()); + + LOG.info("RegisteringAM Host: {}, Port: {}, Tracking Url: {} for application {}. ", + request.getHost(), request.getRpcPort(), request.getTrackingUrl(), + pipeline.getApplicationAttemptId()); + RegisterApplicationMasterResponse response = pipeline.getRootInterceptor().registerApplicationMaster(request); long endTime = clock.getTime(); this.metrics.succeededRegisterAMRequests(endTime - startTime); - LOG.info("RegisterAM processing finished in {} ms for application {}", + LOG.info("RegisterAM processing finished in {} ms for application {}.", endTime - startTime, pipeline.getApplicationAttemptId()); return response; } catch (Throwable t) { @@ -323,24 +322,25 @@ public class AMRMProxyService extends CompositeService implements /** * This is called by the AMs started on this node to unregister from the RM. * This method does the initial authorization and then forwards the request to - * the application instance specific intercepter chain. + * the application instance specific interceptor chain. */ @Override public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { + this.metrics.incrRequestCount(); long startTime = clock.getTime(); try { RequestInterceptorChainWrapper pipeline = authorizeAndGetInterceptorChain(); - LOG.info("Finishing application master for {}. Tracking Url: {}", + LOG.info("Finishing application master for {}. Tracking Url: {}.", pipeline.getApplicationAttemptId(), request.getTrackingUrl()); FinishApplicationMasterResponse response = pipeline.getRootInterceptor().finishApplicationMaster(request); long endTime = clock.getTime(); this.metrics.succeededFinishAMRequests(endTime - startTime); - LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}", + LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}.", response.getIsUnregistered(), endTime - startTime, pipeline.getApplicationAttemptId()); return response; @@ -354,12 +354,13 @@ public class AMRMProxyService extends CompositeService implements * This is called by the AMs started on this node to send heart beat to RM. * This method does the initial authorization and then forwards the request to * the application instance specific pipeline, which is a chain of request - * intercepter objects. One application request processing pipeline is created + * interceptor objects. One application request processing pipeline is created * per AM instance. */ @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { + this.metrics.incrAllocateCount(); long startTime = clock.getTime(); try { AMRMTokenIdentifier amrmTokenIdentifier = @@ -373,7 +374,7 @@ public class AMRMProxyService extends CompositeService implements long endTime = clock.getTime(); this.metrics.succeededAllocateRequests(endTime - startTime); - LOG.info("Allocate processing finished in {} ms for application {}", + LOG.info("Allocate processing finished in {} ms for application {}.", endTime - startTime, pipeline.getApplicationAttemptId()); return allocateResponse; } catch (Throwable t) { @@ -392,6 +393,7 @@ public class AMRMProxyService extends CompositeService implements */ public void processApplicationStartRequest(StartContainerRequest request) throws IOException, YarnException { + this.metrics.incrRequestCount(); long startTime = clock.getTime(); try { ContainerTokenIdentifier containerTokenIdentifierForKey = @@ -408,8 +410,7 @@ public class AMRMProxyService extends CompositeService implements if (!checkIfAppExistsInStateStore(applicationID)) { return; } - LOG.info("Callback received for initializing request " - + "processing pipeline for an AM"); + LOG.info("Callback received for initializing request processing pipeline for an AM."); Credentials credentials = YarnServerSecurityUtils .parseCredentials(request.getContainerLaunchContext()); @@ -417,8 +418,7 @@ public class AMRMProxyService extends CompositeService implements getFirstAMRMToken(credentials.getAllTokens()); if (amrmToken == null) { throw new YarnRuntimeException( - "AMRMToken not found in the start container request for " - + "application:" + appAttemptId.toString()); + "AMRMToken not found in the start container request for application:" + appAttemptId); } // Substitute the existing AMRM Token with a local one. Keep the rest of @@ -445,7 +445,7 @@ public class AMRMProxyService extends CompositeService implements } /** - * Initializes the request intercepter pipeline for the specified application. + * Initializes the request interceptor pipeline for the specified application. * * @param applicationAttemptId attempt id * @param user user name @@ -465,8 +465,7 @@ public class AMRMProxyService extends CompositeService implements .containsKey(applicationAttemptId.getApplicationId())) { LOG.warn("Request to start an already existing appId was received. " + " This can happen if an application failed and a new attempt " - + "was created on this machine. ApplicationId: " - + applicationAttemptId.toString()); + + "was created on this machine. ApplicationId: {}.", applicationAttemptId); RequestInterceptorChainWrapper chainWrapperBackup = this.applPipelineMap.get(applicationAttemptId.getApplicationId()); @@ -476,8 +475,7 @@ public class AMRMProxyService extends CompositeService implements .equals(applicationAttemptId)) { // TODO: revisit in AMRMProxy HA in YARN-6128 // Remove the existing pipeline - LOG.info("Remove the previous pipeline for ApplicationId: " - + applicationAttemptId.toString()); + LOG.info("Remove the previous pipeline for ApplicationId: {}.", applicationAttemptId); RequestInterceptorChainWrapper pipeline = applPipelineMap.remove(applicationAttemptId.getApplicationId()); @@ -485,19 +483,17 @@ public class AMRMProxyService extends CompositeService implements try { this.nmContext.getNMStateStore() .removeAMRMProxyAppContext(applicationAttemptId); - } catch (IOException e) { - LOG.error("Error removing AMRMProxy application context for " - + applicationAttemptId, e); + } catch (IOException ioe) { + LOG.error("Error removing AMRMProxy application context for {}.", + applicationAttemptId, ioe); } } try { pipeline.getRootInterceptor().shutdown(); } catch (Throwable ex) { - LOG.warn( - "Failed to shutdown the request processing pipeline for app:" - + applicationAttemptId.getApplicationId(), - ex); + LOG.warn("Failed to shutdown the request processing pipeline for app: {}.", + applicationAttemptId.getApplicationId(), ex); } } else { return; @@ -510,12 +506,11 @@ public class AMRMProxyService extends CompositeService implements } // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to + // later because chain initialization can be expensive, and we would like to // release the lock as soon as possible to prevent other applications from // blocking when one application's chain is initializing LOG.info("Initializing request processing pipeline for application. " - + " ApplicationId:" + applicationAttemptId + " for the user: " - + user); + + " ApplicationId: {} for the user: {}.", applicationAttemptId, user); try { RequestInterceptor interceptorChain = @@ -525,8 +520,7 @@ public class AMRMProxyService extends CompositeService implements user, amrmToken, localToken, credentials, this.registry)); if (isRecovery) { if (recoveredDataMap == null) { - throw new YarnRuntimeException( - "null recoveredDataMap recieved for recover"); + throw new YarnRuntimeException("null recoveredDataMap received for recover"); } interceptorChain.recover(recoveredDataMap); } @@ -535,13 +529,13 @@ public class AMRMProxyService extends CompositeService implements if (!isRecovery && this.nmContext.getNMStateStore() != null) { try { this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( - applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8")); + applicationAttemptId, NMSS_USER_KEY, user.getBytes(StandardCharsets.UTF_8)); this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( applicationAttemptId, NMSS_AMRMTOKEN_KEY, - amrmToken.encodeToUrlString().getBytes("UTF-8")); + amrmToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { - LOG.error("Error storing AMRMProxy application context entry for " - + applicationAttemptId, e); + LOG.error("Error storing AMRMProxy application context entry for {}.", + applicationAttemptId, e); } } } catch (Exception e) { @@ -557,29 +551,27 @@ public class AMRMProxyService extends CompositeService implements * @param applicationId application id */ protected void stopApplication(ApplicationId applicationId) { - Preconditions.checkArgument(applicationId != null, - "applicationId is null"); + this.metrics.incrRequestCount(); + Preconditions.checkArgument(applicationId != null, "applicationId is null"); RequestInterceptorChainWrapper pipeline = this.applPipelineMap.remove(applicationId); + boolean isStopSuccess = true; + long startTime = clock.getTime(); if (pipeline == null) { - LOG.info( - "No interceptor pipeline for application {}," - + " likely because its AM is not run in this node.", - applicationId); + LOG.info("No interceptor pipeline for application {}," + + " likely because its AM is not run in this node.", applicationId); + isStopSuccess = false; } else { // Remove the appAttempt in AMRMTokenSecretManager - this.secretManager - .applicationMasterFinished(pipeline.getApplicationAttemptId()); - - LOG.info("Stopping the request processing pipeline for application: " - + applicationId); + this.secretManager.applicationMasterFinished(pipeline.getApplicationAttemptId()); + LOG.info("Stopping the request processing pipeline for application: {}.", applicationId); try { pipeline.getRootInterceptor().shutdown(); } catch (Throwable ex) { - LOG.warn( - "Failed to shutdown the request processing pipeline for app:" - + applicationId, ex); + LOG.warn("Failed to shutdown the request processing pipeline for app: {}.", + applicationId, ex); + isStopSuccess = false; } // Remove the app context from NMSS after the interceptors are shutdown @@ -588,74 +580,83 @@ public class AMRMProxyService extends CompositeService implements this.nmContext.getNMStateStore() .removeAMRMProxyAppContext(pipeline.getApplicationAttemptId()); } catch (IOException e) { - LOG.error("Error removing AMRMProxy application context for " - + applicationId, e); + LOG.error("Error removing AMRMProxy application context for {}.", + applicationId, e); + isStopSuccess = false; } } } + + if (isStopSuccess) { + long endTime = clock.getTime(); + this.metrics.succeededAppStopRequests(endTime - startTime); + } else { + this.metrics.incrFailedAppStopRequests(); + } } private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, RequestInterceptorChainWrapper pipeline, AllocateResponse allocateResponse) { + AMRMProxyApplicationContextImpl context = - (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor() - .getApplicationContext(); + (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor().getApplicationContext(); - // check to see if the RM has issued a new AMRMToken & accordingly update - // the real ARMRMToken in the current context - if (allocateResponse.getAMRMToken() != null) { - LOG.info("RM rolled master-key for amrm-tokens"); + try { + long startTime = clock.getTime(); - org.apache.hadoop.yarn.api.records.Token token = - allocateResponse.getAMRMToken(); + // check to see if the RM has issued a new AMRMToken & accordingly update + // the real ARMRMToken in the current context + if (allocateResponse.getAMRMToken() != null) { + LOG.info("RM rolled master-key for amrm-tokens."); - // Do not propagate this info back to AM - allocateResponse.setAMRMToken(null); + org.apache.hadoop.yarn.api.records.Token token = allocateResponse.getAMRMToken(); - org.apache.hadoop.security.token.Token newToken = - ConverterUtils.convertFromYarn(token, (Text) null); + // Do not propagate this info back to AM + allocateResponse.setAMRMToken(null); - // Update the AMRMToken in context map, and in NM state store if it is - // different - if (context.setAMRMToken(newToken) - && this.nmContext.getNMStateStore() != null) { - try { + org.apache.hadoop.security.token.Token newToken = + ConverterUtils.convertFromYarn(token, (Text) null); + + // Update the AMRMToken in context map, and in NM state store if it is + // different + if (context.setAMRMToken(newToken) && this.nmContext.getNMStateStore() != null) { this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, - newToken.encodeToUrlString().getBytes("UTF-8")); - } catch (IOException e) { - LOG.error("Error storing AMRMProxy application context entry for " - + context.getApplicationAttemptId(), e); + newToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8)); } } - } - // Check if the local AMRMToken is rolled up and update the context and - // response accordingly - MasterKeyData nextMasterKey = - this.secretManager.getNextMasterKeyData(); + // Check if the local AMRMToken is rolled up and update the context and + // response accordingly + MasterKeyData nextMasterKey = this.secretManager.getNextMasterKeyData(); - if (nextMasterKey != null - && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier - .getKeyId()) { - Token localToken = context.getLocalAMRMToken(); - if (nextMasterKey.getMasterKey().getKeyId() != context - .getLocalAMRMTokenKeyId()) { - LOG.info("The local AMRMToken has been rolled-over." - + " Send new local AMRMToken back to application: " - + pipeline.getApplicationId()); - localToken = - this.secretManager.createAndGetAMRMToken(pipeline - .getApplicationAttemptId()); - context.setLocalAMRMToken(localToken); + if (nextMasterKey != null) { + MasterKey masterKey = nextMasterKey.getMasterKey(); + if (masterKey.getKeyId() != amrmTokenIdentifier.getKeyId()) { + Token localToken = context.getLocalAMRMToken(); + if (masterKey.getKeyId() != context.getLocalAMRMTokenKeyId()) { + LOG.info("The local AMRMToken has been rolled-over." + + " Send new local AMRMToken back to application: {}", + pipeline.getApplicationId()); + localToken = this.secretManager.createAndGetAMRMToken( + pipeline.getApplicationAttemptId()); + context.setLocalAMRMToken(localToken); + } + + allocateResponse + .setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(localToken.getIdentifier(), localToken + .getKind().toString(), localToken.getPassword(), + localToken.getService().toString())); + } } - - allocateResponse - .setAMRMToken(org.apache.hadoop.yarn.api.records.Token - .newInstance(localToken.getIdentifier(), localToken - .getKind().toString(), localToken.getPassword(), - localToken.getService().toString())); + long endTime = clock.getTime(); + this.metrics.succeededUpdateTokenRequests(endTime - startTime); + } catch (IOException e) { + LOG.error("Error storing AMRMProxy application context entry for {}.", + context.getApplicationAttemptId(), e); + this.metrics.incrFailedUpdateAMRMTokenRequests(); } } @@ -672,19 +673,19 @@ public class AMRMProxyService extends CompositeService implements } /** - * Gets the Request intercepter chains for all the applications. + * Gets the Request interceptor chains for all the applications. * - * @return the request intercepter chains. + * @return the request interceptor chains. */ protected Map getPipelines() { return this.applPipelineMap; } /** - * This method creates and returns reference of the first intercepter in the - * chain of request intercepter instances. + * This method creates and returns reference of the first interceptor in the + * chain of request interceptor instances. * - * @return the reference of the first intercepter in the chain + * @return the reference of the first interceptor in the chain */ protected RequestInterceptor createRequestInterceptorChain() { Configuration conf = getConfig(); @@ -717,7 +718,7 @@ public class AMRMProxyService extends CompositeService implements } catch (ClassNotFoundException e) { throw new YarnRuntimeException( "Could not instantiate ApplicationMasterRequestInterceptor: " - + interceptorClassName, e); + + interceptorClassName, e); } } @@ -729,10 +730,10 @@ public class AMRMProxyService extends CompositeService implements } /** - * Returns the comma separated intercepter class names from the configuration. + * Returns the comma separated interceptor class names from the configuration. * * @param conf configuration - * @return the intercepter class names as an instance of ArrayList + * @return the interceptor class names as an instance of ArrayList */ private List getInterceptorClassNames(Configuration conf) { String configuredInterceptorClassNames = @@ -759,7 +760,7 @@ public class AMRMProxyService extends CompositeService implements * Authorizes the request and returns the application specific request * processing pipeline. * - * @return the the intercepter wrapper instance + * @return the interceptor wrapper instance * @throws YarnException if fails */ private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain() @@ -775,11 +776,10 @@ public class AMRMProxyService extends CompositeService implements tokenIdentifier.getApplicationAttemptId(); synchronized (this.applPipelineMap) { - if (!this.applPipelineMap.containsKey(appAttemptId - .getApplicationId())) { + if (!this.applPipelineMap.containsKey(appAttemptId.getApplicationId())) { throw new YarnException( "The AM request processing pipeline is not initialized for app: " - + appAttemptId.getApplicationId().toString()); + + appAttemptId.getApplicationId()); } return this.applPipelineMap.get(appAttemptId.getApplicationId()); @@ -827,29 +827,26 @@ public class AMRMProxyService extends CompositeService implements /** * Private class for handling application stop events. - * */ class ApplicationEventHandler implements EventHandler { @Override public void handle(ApplicationEvent event) { Application app = - AMRMProxyService.this.nmContext.getApplications().get( - event.getApplicationID()); + AMRMProxyService.this.nmContext.getApplications().get(event.getApplicationID()); if (app != null) { switch (event.getType()) { case APPLICATION_RESOURCES_CLEANEDUP: - LOG.info("Application stop event received for stopping AppId:" - + event.getApplicationID().toString()); + LOG.info("Application stop event received for stopping AppId: {}.", + event.getApplicationID().toString()); AMRMProxyService.this.stopApplication(event.getApplicationID()); break; default: - LOG.debug("AMRMProxy is ignoring event: {}", event.getType()); + LOG.debug("AMRMProxy is ignoring event: {}.", event.getType()); break; } } else { - LOG.warn("Event " + event + " sent to absent application " - + event.getApplicationID()); + LOG.warn("Event {} sent to absent application {}.", event, event.getApplicationID()); } } } @@ -866,20 +863,20 @@ public class AMRMProxyService extends CompositeService implements /** * Initializes the wrapper with the specified parameters. - * - * @param rootInterceptor the root request intercepter - * @param applicationAttemptId attempt id + * + * @param interceptor the root request interceptor + * @param appAttemptId attempt id */ - public synchronized void init(RequestInterceptor rootInterceptor, - ApplicationAttemptId applicationAttemptId) { - this.rootInterceptor = rootInterceptor; - this.applicationAttemptId = applicationAttemptId; + public synchronized void init(RequestInterceptor interceptor, + ApplicationAttemptId appAttemptId) { + rootInterceptor = interceptor; + applicationAttemptId = appAttemptId; } /** - * Gets the root request intercepter. + * Gets the root request interceptor. * - * @return the root request intercepter + * @return the root request interceptor */ public synchronized RequestInterceptor getRootInterceptor() { return rootInterceptor; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java new file mode 100644 index 00000000000..2f20d98426f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.nodemanager.amrmproxy contains + * classes for handling federation amrm information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java index 4621c4dce32..6219641c6ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java @@ -42,11 +42,19 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest { Assert.assertEquals(0, metrics.getFailedRegisterAMRequests()); Assert.assertEquals(0, metrics.getFailedFinishAMRequests()); Assert.assertEquals(0, metrics.getFailedAllocateRequests()); + Assert.assertEquals(0, metrics.getFailedAppRecoveryCount()); + Assert.assertEquals(0, metrics.getFailedAppStopRequests()); + Assert.assertEquals(0, metrics.getFailedUpdateAMRMTokenRequests()); + Assert.assertEquals(0, metrics.getAllocateCount()); + Assert.assertEquals(0, metrics.getRequestCount()); Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests()); Assert.assertEquals(0, metrics.getNumSucceededRegisterAMRequests()); Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests()); Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests()); + Assert.assertEquals(0, metrics.getNumSucceededRecoverRequests()); + Assert.assertEquals(0, metrics.getNumSucceededAppStopRequests()); + Assert.assertEquals(0, metrics.getNumSucceededUpdateAMRMTokenRequests()); LOG.info("Test: aggregate metrics are updated correctly"); } @@ -57,19 +65,19 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest { long failedRegisterAMRequests = metrics.getFailedRegisterAMRequests(); long failedFinishAMRequests = metrics.getFailedFinishAMRequests(); long failedAllocateRequests = metrics.getFailedAllocateRequests(); + long failedAppRecoveryRequests = metrics.getFailedAppRecoveryCount(); + long failedAppStopRequests = metrics.getFailedAppStopRequests(); + long failedUpdateAMRMTokenRequests = metrics.getFailedUpdateAMRMTokenRequests(); long succeededAppStartRequests = metrics.getNumSucceededAppStartRequests(); - long succeededRegisterAMRequests = - metrics.getNumSucceededRegisterAMRequests(); + long succeededRegisterAMRequests = metrics.getNumSucceededRegisterAMRequests(); long succeededFinishAMRequests = metrics.getNumSucceededFinishAMRequests(); long succeededAllocateRequests = metrics.getNumSucceededAllocateRequests(); int testAppId = 1; - RegisterApplicationMasterResponse registerResponse = - registerApplicationMaster(testAppId); + RegisterApplicationMasterResponse registerResponse = registerApplicationMaster(testAppId); Assert.assertNotNull(registerResponse); - Assert - .assertEquals(Integer.toString(testAppId), registerResponse.getQueue()); + Assert.assertEquals(Integer.toString(testAppId), registerResponse.getQueue()); AllocateResponse allocateResponse = allocate(testAppId); Assert.assertNotNull(allocateResponse); @@ -80,14 +88,13 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest { Assert.assertNotNull(finshResponse); Assert.assertEquals(true, finshResponse.getIsUnregistered()); - Assert.assertEquals(failedAppStartRequests, - metrics.getFailedAppStartRequests()); - Assert.assertEquals(failedRegisterAMRequests, - metrics.getFailedRegisterAMRequests()); - Assert.assertEquals(failedFinishAMRequests, - metrics.getFailedFinishAMRequests()); - Assert.assertEquals(failedAllocateRequests, - metrics.getFailedAllocateRequests()); + Assert.assertEquals(failedAppStartRequests, metrics.getFailedAppStartRequests()); + Assert.assertEquals(failedRegisterAMRequests, metrics.getFailedRegisterAMRequests()); + Assert.assertEquals(failedFinishAMRequests, metrics.getFailedFinishAMRequests()); + Assert.assertEquals(failedAllocateRequests, metrics.getFailedAllocateRequests()); + Assert.assertEquals(failedAppRecoveryRequests, metrics.getFailedAppRecoveryCount()); + Assert.assertEquals(failedAppStopRequests, metrics.getFailedAppStopRequests()); + Assert.assertEquals(failedUpdateAMRMTokenRequests, metrics.getFailedUpdateAMRMTokenRequests()); Assert.assertEquals(succeededAppStartRequests, metrics.getNumSucceededAppStartRequests());