From 02b9bfdf9e4bd0b3c05ca5fd75399dedcb656e09 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Wed, 12 Sep 2018 11:46:35 -0700 Subject: [PATCH] YARN-8658. [AMRMProxy] Metrics for AMRMClientRelayer inside FederationInterceptor. Contributed by Young Chen. --- .../hadoop/yarn/server/AMRMClientRelayer.java | 424 ++++++++++----- .../metrics/AMRMClientRelayerMetrics.java | 368 +++++++++++++ .../yarn/server/metrics/package-info.java | 18 + .../server/uam/UnmanagedAMPoolManager.java | 30 +- .../uam/UnmanagedApplicationManager.java | 16 +- .../yarn/server/TestAMRMClientRelayer.java | 2 +- .../metrics/TestAMRMClientRelayerMetrics.java | 513 ++++++++++++++++++ .../uam/TestUnmanagedApplicationManager.java | 2 +- .../amrmproxy/FederationInterceptor.java | 19 +- .../TestableFederationInterceptor.java | 5 +- 10 files changed, 1248 insertions(+), 149 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index 1e2060c752e..2621d3e0a30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey; import org.slf4j.Logger; @@ -98,6 +100,15 @@ public class AMRMClientRelayer extends AbstractService private Set ask = new TreeSet<>(new ResourceRequest.ResourceRequestComparator()); + /** + * Data structures for pending and allocate latency metrics. This only applies + * for requests with non-zero allocationRequestId. + */ + private Map pendingCountForMetrics = new HashMap<>(); + private Map askTimeStamp = new HashMap<>(); + // List of allocated containerId to avoid double counting + private Set knownContainers = new HashSet<>(); + private Set remotePendingRelease = new HashSet<>(); private Set release = new HashSet<>(); @@ -108,6 +119,7 @@ public class AMRMClientRelayer extends AbstractService private Map remotePendingChange = new HashMap<>(); private Map change = new HashMap<>(); + private Map changeTimeStamp = new HashMap<>(); private Map, List> remotePendingSchedRequest = new HashMap<>(); @@ -119,16 +131,26 @@ public class AMRMClientRelayer extends AbstractService // heartbeat private volatile int resetResponseId; + private String rmId = ""; + private volatile boolean shutdown = false; + + private AMRMClientRelayerMetrics metrics; + public AMRMClientRelayer() { super(AMRMClientRelayer.class.getName()); this.resetResponseId = -1; + this.metrics = AMRMClientRelayerMetrics.getInstance(); + this.rmClient = null; + this.appId = null; + this.rmId = ""; } public AMRMClientRelayer(ApplicationMasterProtocol rmClient, - ApplicationId appId) { + ApplicationId appId, String rmId) { this(); this.rmClient = rmClient; this.appId = appId; + this.rmId = rmId; } @Override @@ -155,6 +177,7 @@ public class AMRMClientRelayer extends AbstractService if (this.rmClient != null) { RPC.stopProxy(this.rmClient); } + shutdown(); super.serviceStop(); } @@ -163,6 +186,49 @@ public class AMRMClientRelayer extends AbstractService this.amRegistrationRequest = registerRequest; } + public void setRMClient(ApplicationMasterProtocol client){ + this.rmClient = client; + } + + public void shutdown() { + // On finish, clear out our pending count from the metrics + // and set the shut down flag so no more pending requests get + // added + synchronized (this) { + if (this.shutdown) { + LOG.warn( + "Shutdown called twice for AMRMClientRelayer for RM " + this.rmId); + return; + } + this.shutdown = true; + for (Map.Entry entry + : this.remotePendingAsks .entrySet()) { + ResourceRequestSetKey key = entry.getKey(); + if (key.getAllocationRequestId() == 0) { + this.metrics.decrClientPending(this.rmId, + AMRMClientRelayerMetrics.getRequestType(key.getExeType()), + entry.getValue().getNumContainers()); + } else { + this.askTimeStamp.remove(key.getAllocationRequestId()); + Integer pending = + this.pendingCountForMetrics.remove(key.getAllocationRequestId()); + if (pending == null) { + throw new YarnRuntimeException( + "pendingCountForMetrics not found for key " + key + + " during shutdown"); + } + this.metrics.decrClientPending(this.rmId, + AMRMClientRelayerMetrics.getRequestType(key.getExeType()), + pending); + } + } + for(UpdateContainerRequest req : remotePendingChange.values()) { + this.metrics + .decrClientPending(rmId, req.getContainerUpdateType(), 1); + } + } + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) @@ -178,7 +244,8 @@ public class AMRMClientRelayer extends AbstractService try { return this.rmClient.finishApplicationMaster(request); } catch (ApplicationMasterNotRegisteredException e) { - LOG.warn("Out of sync with RM for " + this.appId + ", hence resyncing."); + LOG.warn("Out of sync with RM " + rmId + + " for " + this.appId + ", hence resyncing."); // re register with RM registerApplicationMaster(this.amRegistrationRequest); return finishApplicationMaster(request); @@ -215,7 +282,23 @@ public class AMRMClientRelayer extends AbstractService if (allocateRequest.getUpdateRequests() != null) { for (UpdateContainerRequest update : allocateRequest .getUpdateRequests()) { - this.remotePendingChange.put(update.getContainerId(), update); + UpdateContainerRequest req = + this.remotePendingChange.put(update.getContainerId(), update); + this.changeTimeStamp + .put(update.getContainerId(), System.currentTimeMillis()); + if (req == null) { + // If this is a brand new request, all we have to do is increment + this.metrics + .incrClientPending(rmId, update.getContainerUpdateType(), 1); + } else if (req.getContainerUpdateType() != update + .getContainerUpdateType()) { + // If this is replacing a request with a different update type, we + // need to decrement the replaced type + this.metrics + .decrClientPending(rmId, req.getContainerUpdateType(), 1); + this.metrics + .incrClientPending(rmId, update.getContainerUpdateType(), 1); + } this.change.put(update.getContainerId(), update); } } @@ -232,141 +315,196 @@ public class AMRMClientRelayer extends AbstractService public AllocateResponse allocate(AllocateRequest allocateRequest) throws YarnException, IOException { AllocateResponse allocateResponse = null; + long startTime = System.currentTimeMillis(); + synchronized (this) { + if(this.shutdown){ + throw new YarnException("Allocate called after AMRMClientRelayer for " + + "RM " + rmId + " shutdown."); + } + addNewAllocateRequest(allocateRequest); + + ArrayList askList = new ArrayList<>(ask.size()); + for (ResourceRequest r : ask) { + // create a copy of ResourceRequest as we might change it while the + // RPC layer is using it to send info across + askList.add(ResourceRequest.clone(r)); + } + + allocateRequest = AllocateRequest.newBuilder() + .responseId(allocateRequest.getResponseId()) + .progress(allocateRequest.getProgress()).askList(askList) + .releaseList(new ArrayList<>(this.release)) + .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance( + new ArrayList<>(this.blacklistAdditions), + new ArrayList<>(this.blacklistRemovals))) + .updateRequests(new ArrayList<>(this.change.values())) + .schedulingRequests(new ArrayList<>(this.schedulingRequest)) + .build(); + + if (this.resetResponseId != -1) { + LOG.info("Override allocate responseId from " + + allocateRequest.getResponseId() + " to " + this.resetResponseId + + " for " + this.appId); + allocateRequest.setResponseId(this.resetResponseId); + } + } + + // Do the actual allocate call try { + allocateResponse = this.rmClient.allocate(allocateRequest); + + // Heartbeat succeeded, wipe out responseId overriding + this.resetResponseId = -1; + } catch (ApplicationMasterNotRegisteredException e) { + // This is a retriable exception - we will re register and mke a + // recursive call to retry + LOG.warn("ApplicationMaster is out of sync with RM " + rmId + + " for " + this.appId + ", hence resyncing."); + + this.metrics.incrRMMasterSlaveSwitch(this.rmId); + synchronized (this) { - addNewAllocateRequest(allocateRequest); - - ArrayList askList = new ArrayList<>(ask.size()); - for (ResourceRequest r : ask) { - // create a copy of ResourceRequest as we might change it while the - // RPC layer is using it to send info across - askList.add(ResourceRequest.clone(r)); + // Add all remotePending data into to-send data structures + for (ResourceRequestSet requestSet : this.remotePendingAsks + .values()) { + for (ResourceRequest rr : requestSet.getRRs()) { + addResourceRequestToAsk(rr); + } } - - allocateRequest = AllocateRequest.newBuilder() - .responseId(allocateRequest.getResponseId()) - .progress(allocateRequest.getProgress()).askList(askList) - .releaseList(new ArrayList<>(this.release)) - .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance( - new ArrayList<>(this.blacklistAdditions), - new ArrayList<>(this.blacklistRemovals))) - .updateRequests(new ArrayList<>(this.change.values())) - .schedulingRequests(new ArrayList<>(this.schedulingRequest)) - .build(); - - if (this.resetResponseId != -1) { - LOG.info("Override allocate responseId from " - + allocateRequest.getResponseId() + " to " + this.resetResponseId - + " for " + this.appId); - allocateRequest.setResponseId(this.resetResponseId); + this.release.addAll(this.remotePendingRelease); + this.blacklistAdditions.addAll(this.remoteBlacklistedNodes); + this.change.putAll(this.remotePendingChange); + for (List reqs : this.remotePendingSchedRequest + .values()) { + this.schedulingRequest.addAll(reqs); } } - // Do the actual allocate call - try { - allocateResponse = this.rmClient.allocate(allocateRequest); + // re-register with RM, then retry allocate recursively + registerApplicationMaster(this.amRegistrationRequest); + // Reset responseId after re-register + allocateRequest.setResponseId(0); + allocateResponse = allocate(allocateRequest); + return allocateResponse; + } catch (Throwable t) { + // Unexpected exception - rethrow and increment heart beat failure metric + this.metrics.addHeartbeatFailure(this.rmId, + System.currentTimeMillis() - startTime); - // Heartbeat succeeded, wipe out responseId overriding - this.resetResponseId = -1; - } catch (ApplicationMasterNotRegisteredException e) { - LOG.warn("ApplicationMaster is out of sync with RM for " + this.appId - + " hence resyncing."); + // If RM is complaining about responseId out of sync, force reset next + // time + if (t instanceof InvalidApplicationMasterRequestException) { + int responseId = AMRMClientUtils + .parseExpectedResponseIdFromException(t.getMessage()); + if (responseId != -1) { + this.resetResponseId = responseId; + LOG.info("ResponseId out of sync with RM, expect " + responseId + + " but " + allocateRequest.getResponseId() + " used by " + + this.appId + ". Will override in the next allocate."); + } else { + LOG.warn("Failed to parse expected responseId out of exception for " + + this.appId); + } + } - synchronized (this) { - // Add all remotePending data into to-send data structures - for (ResourceRequestSet requestSet : this.remotePendingAsks - .values()) { - for (ResourceRequest rr : requestSet.getRRs()) { - addResourceRequestToAsk(rr); + throw t; + } + + synchronized (this) { + if (this.shutdown) { + throw new YarnException("Allocate call succeeded for " + this.appId + + " after AMRMClientRelayer for RM " + rmId + " shutdown."); + } + + updateMetrics(allocateResponse, startTime); + + AMRMClientUtils.removeFromOutstandingSchedulingRequests( + allocateResponse.getAllocatedContainers(), + this.remotePendingSchedRequest); + AMRMClientUtils.removeFromOutstandingSchedulingRequests( + allocateResponse.getContainersFromPreviousAttempts(), + this.remotePendingSchedRequest); + + this.ask.clear(); + this.release.clear(); + + this.blacklistAdditions.clear(); + this.blacklistRemovals.clear(); + + this.change.clear(); + this.schedulingRequest.clear(); + return allocateResponse; + } + } + + private void updateMetrics(AllocateResponse allocateResponse, + long startTime) { + this.metrics.addHeartbeatSuccess(this.rmId, + System.currentTimeMillis() - startTime); + // Process the allocate response from RM + if (allocateResponse.getAllocatedContainers() != null) { + for (Container container : allocateResponse + .getAllocatedContainers()) { + // Do not update metrics aggressively for AllocationRequestId zero + // case. Also avoid double count to due to re-send + if (this.knownContainers.add(container.getId())) { + this.metrics.addFulfilledQPS(this.rmId, AMRMClientRelayerMetrics + .getRequestType(container.getExecutionType()), 1); + if (container.getAllocationRequestId() != 0) { + Integer count = this.pendingCountForMetrics + .get(container.getAllocationRequestId()); + if (count != null && count > 0) { + this.pendingCountForMetrics + .put(container.getAllocationRequestId(), --count); + this.metrics.decrClientPending(this.rmId, + AMRMClientRelayerMetrics + .getRequestType(container.getExecutionType()), 1); + this.metrics.addFulfillLatency(this.rmId, + AMRMClientRelayerMetrics + .getRequestType(container.getExecutionType()), + System.currentTimeMillis() - this.askTimeStamp + .get(container.getAllocationRequestId())); } } - this.release.addAll(this.remotePendingRelease); - this.blacklistAdditions.addAll(this.remoteBlacklistedNodes); - this.change.putAll(this.remotePendingChange); - for (List reqs : this.remotePendingSchedRequest - .values()) { - this.schedulingRequest.addAll(reqs); - } - } - - // re-register with RM, then retry allocate recursively - registerApplicationMaster(this.amRegistrationRequest); - // Reset responseId after re-register - allocateRequest.setResponseId(0); - return allocate(allocateRequest); - } catch (Throwable t) { - - // If RM is complaining about responseId out of sync, force reset next - // time - if (t instanceof InvalidApplicationMasterRequestException) { - int responseId = AMRMClientUtils - .parseExpectedResponseIdFromException(t.getMessage()); - if (responseId != -1) { - this.resetResponseId = responseId; - LOG.info("ResponseId out of sync with RM, expect " + responseId - + " but " + allocateRequest.getResponseId() + " used by " - + this.appId + ". Will override in the next allocate."); - } else { - LOG.warn("Failed to parse expected responseId out of exception for " - + this.appId); - } - } - - throw t; - } - - synchronized (this) { - // Process the allocate response from RM - if (allocateResponse.getCompletedContainersStatuses() != null) { - for (ContainerStatus container : allocateResponse - .getCompletedContainersStatuses()) { - this.remotePendingRelease.remove(container.getContainerId()); - this.remotePendingChange.remove(container.getContainerId()); - } - } - - if (allocateResponse.getUpdatedContainers() != null) { - for (UpdatedContainer updatedContainer : allocateResponse - .getUpdatedContainers()) { - this.remotePendingChange - .remove(updatedContainer.getContainer().getId()); - } - } - - AMRMClientUtils.removeFromOutstandingSchedulingRequests( - allocateResponse.getAllocatedContainers(), - this.remotePendingSchedRequest); - AMRMClientUtils.removeFromOutstandingSchedulingRequests( - allocateResponse.getContainersFromPreviousAttempts(), - this.remotePendingSchedRequest); - } - - } finally { - synchronized (this) { - /* - * If allocateResponse is null, it means exception happened and RM did - * not accept the request. Don't clear any data structures so that they - * will be re-sent next time. - * - * Otherwise request was accepted by RM, we are safe to clear these. - */ - if (allocateResponse != null) { - this.ask.clear(); - this.release.clear(); - - this.blacklistAdditions.clear(); - this.blacklistRemovals.clear(); - - this.change.clear(); - this.schedulingRequest.clear(); } } } - return allocateResponse; + if (allocateResponse.getCompletedContainersStatuses() != null) { + for (ContainerStatus container : allocateResponse + .getCompletedContainersStatuses()) { + this.remotePendingRelease.remove(container.getContainerId()); + UpdateContainerRequest req = + this.remotePendingChange.remove(container.getContainerId()); + if (req != null) { + this.metrics + .decrClientPending(rmId, req.getContainerUpdateType(), 1); + } + this.knownContainers.remove(container.getContainerId()); + } + } + + if (allocateResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocateResponse + .getUpdatedContainers()) { + UpdateContainerRequest req = this.remotePendingChange + .remove(updatedContainer.getContainer().getId()); + if (req != null) { + this.metrics + .decrClientPending(rmId, req.getContainerUpdateType(), 1); + this.metrics.addFulfillLatency(rmId, req.getContainerUpdateType(), + System.currentTimeMillis() - this.changeTimeStamp + .remove(req.getContainerId())); + this.metrics + .addFulfilledQPS(rmId, req.getContainerUpdateType(), 1); + } + } + } + } private void addNewAsks(List asks) throws YarnException { Set touchedKeys = new HashSet<>(); + Set nonZeroNewKeys = new HashSet<>(); for (ResourceRequest rr : asks) { addResourceRequestToAsk(rr); @@ -377,8 +515,38 @@ public class AMRMClientRelayer extends AbstractService if (askSet == null) { askSet = new ResourceRequestSet(key); this.remotePendingAsks.put(key, askSet); + if (key.getAllocationRequestId() != 0) { + nonZeroNewKeys.add(key); + } } + + int numContainers = askSet.getNumContainers(); askSet.addAndOverrideRR(rr); + int deltaContainers = askSet.getNumContainers() - numContainers; + + if (key.getAllocationRequestId() == 0) { + // AllocationRequestId is zero, keep track of pending count in the + // delayed but correct way. Allocation latency is not supported + if (deltaContainers != 0) { + this.metrics.incrClientPending(this.rmId, + AMRMClientRelayerMetrics.getRequestType(key.getExeType()), + deltaContainers); + if(deltaContainers > 0){ + this.metrics.addRequestedQPS(this.rmId, + AMRMClientRelayerMetrics.getRequestType(key.getExeType()), + deltaContainers); + } + } + } else { + // AllocationRequestId is non-zero, we do pending decrement and latency + // aggressively. So don't update metrics here. Double check AM is not + // reusing the requestId for more asks + if (deltaContainers > 0 && numContainers != 0) { + throw new YarnException("Received new ask (" + + askSet.getNumContainers() + ") on top of existing (" + + numContainers + ") in key " + key); + } + } } // Cleanup properly if needed @@ -391,6 +559,20 @@ public class AMRMClientRelayer extends AbstractService askSet.cleanupZeroNonAnyRR(); } } + + // Initialize data for pending metrics for each new key + for (ResourceRequestSetKey key : nonZeroNewKeys) { + if(remotePendingAsks.containsKey(key)){ + this.askTimeStamp.put(key.getAllocationRequestId(), + System.currentTimeMillis()); + int count = this.remotePendingAsks.get(key).getNumContainers(); + this.pendingCountForMetrics.put(key.getAllocationRequestId(), count); + this.metrics.incrClientPending(this.rmId, + AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count); + this.metrics.addRequestedQPS(this.rmId, + AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count); + } + } } private void addResourceRequestToAsk(ResourceRequest remoteRequest) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java new file mode 100644 index 00000000000..6ce58519fa7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AMRMClientRelayerMetrics.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.metrics; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Metrics for FederationInterceptor Internals. + */ +@InterfaceAudience.Private +@Metrics(about = "Performance and usage metrics for YARN AMRMClientRelayer", + context = "fedr") +public final class AMRMClientRelayerMetrics implements MetricsSource{ + + /** + * Easier classification of request types for logging metrics. + */ + public enum RequestType { + Guaranteed, Opportunistic, Promote, Demote; + + @Override + public String toString() { + switch (this) { + case Guaranteed: + return "G"; + case Opportunistic: + return "O"; + case Promote: + return "P"; + case Demote: + return "D"; + default: + throw new IllegalArgumentException(); + } + } + } + + private static AtomicBoolean isInitialized = new AtomicBoolean(false); + + private static final MetricsInfo RECORD_INFO = + info("AMRMClientRelayerMetrics", + "Metrics for the Yarn AMRMClientRelayer"); + + private static volatile AMRMClientRelayerMetrics instance = null; + private static MetricsRegistry registry; + + // The metrics are set up as a map from string (typically sub cluster id) to + // request type (Guaranteed, Opp, Promote, Demote) to the counter. + // The counters are constructed lazily when the first metric entry + // comes in. + // For some metrics, request type is not applicable. + private final Map> + rmClientPending = new ConcurrentHashMap<>(); + + private final Map> fulfillLatency = + new ConcurrentHashMap<>(); + + private final Map> + requestedQps = new ConcurrentHashMap<>(); + + private final Map> + fulfilledQps = new ConcurrentHashMap<>(); + + private final Map rmMasterSlaveSwitch = + new ConcurrentHashMap<>(); + + private final Map heartbeatFailure = + new ConcurrentHashMap<>(); + + private final Map heartbeatSuccess = + new ConcurrentHashMap<>(); + private final Map heartbeatLatency = + new ConcurrentHashMap<>(); + + /** + * Initialize the singleton instance. + * + * @return the singleton + */ + public static AMRMClientRelayerMetrics getInstance() { + if (!isInitialized.get()) { + synchronized (AMRMClientRelayerMetrics.class) { + if (instance == null) { + instance = new AMRMClientRelayerMetrics(); + DefaultMetricsSystem.instance().register(RECORD_INFO.name(), + RECORD_INFO.description(), instance); + isInitialized.set(true); + } + } + } + return instance; + } + + private AMRMClientRelayerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "AMRMClientRelayer"); + } + + public static RequestType getRequestType(ExecutionType execType) { + if (execType == null || execType.equals(ExecutionType.GUARANTEED)) { + return RequestType.Guaranteed; + } + return RequestType.Opportunistic; + } + + @VisibleForTesting + protected MutableGaugeLong getPendingMetric(String instanceId, + RequestType type) { + synchronized (rmClientPending) { + if (!rmClientPending.containsKey(instanceId)) { + rmClientPending.put(instanceId, + new ConcurrentHashMap()); + } + if (!rmClientPending.get(instanceId).containsKey(type)) { + rmClientPending.get(instanceId).put(type, registry + .newGauge(type.toString() + "Pending" + instanceId, + "Remove pending of " + type + " for " + instanceId, 0L)); + } + } + return rmClientPending.get(instanceId).get(type); + } + + public void incrClientPending(String instanceId, RequestType type, int diff) { + getPendingMetric(instanceId, type).incr(diff); + } + + public void decrClientPending(String instanceId, RequestType type, int diff) { + getPendingMetric(instanceId, type).decr(diff); + } + + @VisibleForTesting + protected void setClientPending(String instanceId, RequestType type, + int val) { + getPendingMetric(instanceId, type).set(val); + } + + @VisibleForTesting + protected MutableQuantiles getFulfillLatencyMetric(String instanceId, + RequestType type) { + synchronized (fulfillLatency) { + if (!fulfillLatency.containsKey(instanceId)) { + fulfillLatency.put(instanceId, + new ConcurrentHashMap()); + } + if (!fulfillLatency.get(instanceId).containsKey(type)) { + fulfillLatency.get(instanceId).put(type, registry + .newQuantiles(type.toString() + "FulfillLatency" + instanceId, + "FulfillLatency of " + type + " for " + instanceId, "ops", + "latency", 60)); + } + } + return fulfillLatency.get(instanceId).get(type); + } + + public void addFulfillLatency(String instanceId, RequestType type, + long latency) { + getFulfillLatencyMetric(instanceId, type).add(latency); + } + + public void addFulfillLatency(String instanceId, ContainerUpdateType type, + long latency) { + switch(type) { + case DEMOTE_EXECUTION_TYPE: + addFulfillLatency(instanceId, RequestType.Demote, latency); + break; + case PROMOTE_EXECUTION_TYPE: + addFulfillLatency(instanceId, RequestType.Promote, latency); + break; + default: + break; + } + } + + @VisibleForTesting + protected MutableGaugeLong getRequestedQPSMetric(String instanceId, + RequestType type) { + synchronized (requestedQps) { + if (!requestedQps.containsKey(instanceId)) { + requestedQps.put(instanceId, + new ConcurrentHashMap()); + } + if (!requestedQps.get(instanceId).containsKey(type)) { + requestedQps.get(instanceId) + .put(type, registry.newGauge( + info(type.toString() + "RequestedOps" + instanceId, + "Requested operations of " + type + " for " + instanceId), + 0L)); + } + } + return requestedQps.get(instanceId).get(type); + } + + public void addRequestedQPS(String instanceId, RequestType type, + long numEntries) { + getRequestedQPSMetric(instanceId, type).incr(numEntries); + } + + @VisibleForTesting + protected MutableGaugeLong getFulfilledQPSMetric(String instanceId, + RequestType type) { + synchronized (fulfilledQps) { + if (!fulfilledQps.containsKey(instanceId)) { + fulfilledQps.put(instanceId, + new ConcurrentHashMap()); + } + if (!fulfilledQps.get(instanceId).containsKey(type)) { + fulfilledQps.get(instanceId) + .put(type, registry.newGauge( + info(type.toString() + "FulfilledOps" + instanceId, + "Fulfilled operations of " + type + " for " + instanceId), + 0L)); + } + } + return fulfilledQps.get(instanceId).get(type); + } + + public void addFulfilledQPS(String instanceId, RequestType type, + long numEntries) { + getFulfilledQPSMetric(instanceId, type).incr(numEntries); + } + + public void addFulfilledQPS(String instanceId, ContainerUpdateType type, + long latency) { + switch(type) { + case DEMOTE_EXECUTION_TYPE: + addFulfilledQPS(instanceId, RequestType.Demote, latency); + break; + case PROMOTE_EXECUTION_TYPE: + addFulfilledQPS(instanceId, RequestType.Promote, latency); + break; + default: + break; + } + } + + public void incrClientPending(String scId, ContainerUpdateType type, + int diff) { + switch(type) { + case DEMOTE_EXECUTION_TYPE: + incrClientPending(scId, RequestType.Demote, diff); + break; + case PROMOTE_EXECUTION_TYPE: + incrClientPending(scId, RequestType.Promote, diff); + break; + default: + break; + } + } + + public void decrClientPending(String scId, ContainerUpdateType type, + int diff) { + switch(type) { + case DEMOTE_EXECUTION_TYPE: + decrClientPending(scId, RequestType.Demote, diff); + break; + case PROMOTE_EXECUTION_TYPE: + decrClientPending(scId, RequestType.Promote, diff); + break; + default: + break; + } + } + + @VisibleForTesting + protected MutableGaugeLong getRMMasterSlaveSwitchMetric( + String instanceId) { + synchronized (rmMasterSlaveSwitch) { + if (!rmMasterSlaveSwitch.containsKey(instanceId)) { + rmMasterSlaveSwitch.put(instanceId, registry.newGauge( + info("RMMasterSlaveSwitch" + instanceId, + "Number of RM master slave switch"), 0L)); + } + } + return rmMasterSlaveSwitch.get(instanceId); + } + + public void incrRMMasterSlaveSwitch(String instanceId) { + getRMMasterSlaveSwitchMetric(instanceId).incr(); + } + + @VisibleForTesting + protected MutableQuantiles getHeartbeatLatencyMetric(String instanceId) { + synchronized (heartbeatLatency) { + if (!heartbeatLatency.containsKey(instanceId)) { + heartbeatLatency.put(instanceId, registry + .newQuantiles("HeartbeatLatency" + instanceId, + "HeartbeatLatency for " + instanceId, "ops", "latency", 60)); + } + } + return heartbeatLatency.get(instanceId); + } + + @VisibleForTesting + protected MutableGaugeLong getHeartbeatFailureMetric( + String instanceId) { + synchronized (heartbeatFailure) { + if (!heartbeatFailure.containsKey(instanceId)) { + heartbeatFailure.put(instanceId, registry.newGauge( + info("HeartbeatFailure" + instanceId, + "Number of Heartbeat Failures"), 0L)); + } + } + return heartbeatFailure.get(instanceId); + } + + public void addHeartbeatFailure(String instanceId, long latency) { + getHeartbeatFailureMetric(instanceId).incr(); + + getHeartbeatLatencyMetric(instanceId).add(latency); + } + + @VisibleForTesting + protected MutableGaugeLong getHeartbeatSuccessMetric( + String instanceId) { + synchronized (heartbeatSuccess) { + if (!heartbeatSuccess.containsKey(instanceId)) { + heartbeatSuccess.put(instanceId, registry.newGauge( + info("HeartbeatSuccess" + instanceId, + "Number of Heartbeat"), 0L)); + } + } + return heartbeatSuccess.get(instanceId); + } + + public void addHeartbeatSuccess(String instanceId, long latency) { + getHeartbeatSuccessMetric(instanceId).incr(); + + getHeartbeatLatencyMetric(instanceId).add(latency); + } + + @Override + public void getMetrics(MetricsCollector builder, boolean all) { + registry.snapshot(builder.addRecord(registry.info().name()), all); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java new file mode 100644 index 00000000000..a67cf9304c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.metrics; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 5f9d81b881a..d708cedec17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -150,6 +150,7 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param appNameSuffix application name suffix for the UAM * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * recovery. + * @param rmName name of the YarnRM * @see ApplicationSubmissionContext * #setKeepContainersAcrossApplicationAttempts(boolean) * @return uamId for the UAM @@ -159,7 +160,7 @@ public class UnmanagedAMPoolManager extends AbstractService { public String createAndRegisterNewUAM( RegisterApplicationMasterRequest registerRequest, Configuration conf, String queueName, String submitter, String appNameSuffix, - boolean keepContainersAcrossApplicationAttempts) + boolean keepContainersAcrossApplicationAttempts, String rmName) throws YarnException, IOException { ApplicationId appId = null; ApplicationClientProtocol rmClient; @@ -183,7 +184,7 @@ public class UnmanagedAMPoolManager extends AbstractService { // Launch the UAM in RM launchUAM(appId.toString(), conf, appId, queueName, submitter, - appNameSuffix, keepContainersAcrossApplicationAttempts); + appNameSuffix, keepContainersAcrossApplicationAttempts, rmName); // Register the UAM application registerApplicationMaster(appId.toString(), registerRequest); @@ -203,6 +204,7 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param appNameSuffix application name suffix for the UAM * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * recovery. + * @param rmName name of the YarnRM * @see ApplicationSubmissionContext * #setKeepContainersAcrossApplicationAttempts(boolean) * @return UAM token @@ -211,14 +213,15 @@ public class UnmanagedAMPoolManager extends AbstractService { */ public Token launchUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) - throws YarnException, IOException { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, + String rmName) throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, - submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, + rmName); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); @@ -248,19 +251,20 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM * @param uamToken UAM token + * @param rmName name of the YarnRM * @throws YarnException if fails * @throws IOException if fails */ - public void reAttachUAM(String uamId, Configuration conf, - ApplicationId appId, String queueName, String submitter, - String appNameSuffix, Token uamToken) + public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId, + String queueName, String submitter, String appNameSuffix, + Token uamToken, String rmName) throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } - UnmanagedApplicationManager uam = - createUAM(conf, appId, queueName, submitter, appNameSuffix, true); + UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, + submitter, appNameSuffix, true, rmName); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); @@ -287,14 +291,16 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param submitter submitter name of the application * @param appNameSuffix application name suffix * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * @param rmName name of the YarnRM * @return the UAM instance */ @VisibleForTesting protected UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, + String rmName) { return new UnmanagedApplicationManager(conf, appId, queueName, submitter, - appNameSuffix, keepContainersAcrossApplicationAttempts); + appNameSuffix, keepContainersAcrossApplicationAttempts, rmName); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 78dcfb672b4..7c1e154c5e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -116,13 +116,14 @@ public class UnmanagedApplicationManager { * @param queueName the queue of the UAM * @param submitter user name of the app * @param appNameSuffix the app name suffix to use + * @param rmName name of the YarnRM * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * recovery. See {@link ApplicationSubmissionContext * #setKeepContainersAcrossApplicationAttempts(boolean)} */ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, - boolean keepContainersAcrossApplicationAttempts) { + boolean keepContainersAcrossApplicationAttempts, String rmName) { Preconditions.checkNotNull(conf, "Configuration cannot be null"); Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); Preconditions.checkNotNull(submitter, "App submitter cannot be null"); @@ -132,9 +133,11 @@ public class UnmanagedApplicationManager { this.queueName = queueName; this.submitter = submitter; this.appNameSuffix = appNameSuffix; + this.userUgi = null; this.heartbeatHandler = new AMHeartbeatRequestHandler(this.conf, this.applicationId); - this.rmProxyRelayer = null; + this.rmProxyRelayer = + new AMRMClientRelayer(null, this.applicationId, rmName); this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); @@ -186,9 +189,8 @@ public class UnmanagedApplicationManager { throws IOException { this.userUgi = UserGroupInformation.createProxyUser( this.applicationId.toString(), UserGroupInformation.getCurrentUser()); - this.rmProxyRelayer = - new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class, - this.conf, this.userUgi, amrmToken), this.applicationId); + this.rmProxyRelayer.setRMClient(createRMProxy( + ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken)); this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer); this.heartbeatHandler.setUGI(this.userUgi); @@ -245,7 +247,7 @@ public class UnmanagedApplicationManager { this.heartbeatHandler.shutdown(); - if (this.rmProxyRelayer == null) { + if (this.userUgi == null) { if (this.connectionInitiated) { // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. @@ -299,7 +301,7 @@ public class UnmanagedApplicationManager { // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. - if (this.rmProxyRelayer == null) { + if (this.userUgi == null) { if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java index 4c84f0b9210..2c016d769fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java @@ -140,7 +140,7 @@ public class TestAMRMClientRelayer { this.conf = new Configuration(); this.mockAMS = new MockApplicationMasterService(); - this.relayer = new AMRMClientRelayer(this.mockAMS, null); + this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST"); this.relayer.init(conf); this.relayer.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java new file mode 100644 index 00000000000..ebbfae238d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java @@ -0,0 +1,513 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; +import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics.RequestType; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Unit test for AMRMClientRelayer. + */ +public class TestAMRMClientRelayerMetrics { + + /** + * Mock AMS for easier testing and mocking of request/responses. + */ + public static class MockApplicationMasterService + implements ApplicationMasterProtocol { + + private boolean failover = false; + private boolean exception = false; + private List lastAsk; + private List lastRelease; + private List lastUpdates; + private List lastBlacklistAdditions; + private List lastBlacklistRemovals; + private AllocateResponse response = AllocateResponse + .newInstance(0, null, null, new ArrayList(), + Resource.newInstance(0, 0), null, 0, null, null); + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + return null; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + if (this.failover) { + this.failover = false; + throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); + } + return null; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (this.failover) { + this.failover = false; + throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); + } + if(this.exception){ + this.exception = false; + throw new YarnException("Mock RM encountered exception"); + } + this.lastAsk = request.getAskList(); + this.lastRelease = request.getReleaseList(); + this.lastUpdates = request.getUpdateRequests(); + this.lastBlacklistAdditions = + request.getResourceBlacklistRequest().getBlacklistAdditions(); + this.lastBlacklistRemovals = + request.getResourceBlacklistRequest().getBlacklistRemovals(); + return response; + } + + public void setFailoverFlag() { + this.failover = true; + } + } + + private Configuration conf; + private MockApplicationMasterService mockAMS; + private String homeID = "home"; + private AMRMClientRelayer homeRelayer; + private String uamID = "uam"; + private AMRMClientRelayer uamRelayer; + + private List asks = new ArrayList<>(); + private List releases = new ArrayList<>(); + private List updates = new ArrayList<>(); + private List blacklistAdditions = new ArrayList<>(); + private List blacklistRemoval = new ArrayList<>(); + + @Before + public void setup() throws YarnException, IOException { + this.conf = new Configuration(); + + this.mockAMS = new MockApplicationMasterService(); + + this.homeRelayer = new AMRMClientRelayer(this.mockAMS, + ApplicationId.newInstance(0, 0), this.homeID); + this.homeRelayer.init(conf); + this.homeRelayer.start(); + + this.homeRelayer.registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance("", 0, "")); + + this.uamRelayer = new AMRMClientRelayer(this.mockAMS, + ApplicationId.newInstance(0, 0), this.uamID); + this.uamRelayer.init(conf); + this.uamRelayer.start(); + + this.uamRelayer.registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance("", 0, "")); + + clearAllocateRequestLists(); + + AMRMClientRelayerMetrics.getInstance() + .setClientPending(homeID, RequestType.Guaranteed, 0); + AMRMClientRelayerMetrics.getInstance() + .setClientPending(homeID, RequestType.Opportunistic, 0); + AMRMClientRelayerMetrics.getInstance() + .setClientPending(homeID, RequestType.Promote, 0); + AMRMClientRelayerMetrics.getInstance() + .setClientPending(homeID, RequestType.Demote, 0); + + AMRMClientRelayerMetrics.getInstance() + .setClientPending(uamID, RequestType.Guaranteed, 0); + AMRMClientRelayerMetrics.getInstance() + .setClientPending(uamID, RequestType.Opportunistic, 0); + AMRMClientRelayerMetrics.getInstance() + .setClientPending(uamID, RequestType.Promote, 0); + AMRMClientRelayerMetrics.getInstance() + .setClientPending(uamID, RequestType.Demote, 0); + } + + private AllocateRequest getAllocateRequest() { + // Need to create a new one every time because rather than directly + // referring the lists, the protobuf impl makes a copy of the lists + return AllocateRequest.newBuilder() + .responseId(0) + .progress(0).askList(asks) + .releaseList(new ArrayList<>(this.releases)) + .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance( + new ArrayList<>(this.blacklistAdditions), + new ArrayList<>(this.blacklistRemoval))) + .updateRequests(new ArrayList<>(this.updates)) + .build(); + } + + private void clearAllocateRequestLists() { + this.asks.clear(); + this.releases.clear(); + this.updates.clear(); + this.blacklistAdditions.clear(); + this.blacklistRemoval.clear(); + } + + private static UpdateContainerRequest createPromote(int id){ + return UpdateContainerRequest.newInstance(0, createContainerId(id), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0), + ExecutionType.GUARANTEED); + } + + private static UpdateContainerRequest createDemote(int id){ + return UpdateContainerRequest.newInstance(0, createContainerId(id), + ContainerUpdateType.DEMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0), + ExecutionType.OPPORTUNISTIC); + } + + private static ContainerId createContainerId(int id) { + return ContainerId.newContainerId( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), + id); + } + + public ResourceRequest createResourceRequest(long id, String resource, + int memory, int vCores, int priority, ExecutionType execType, + int containers) { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setAllocationRequestId(id); + req.setResourceName(resource); + req.setCapability(Resource.newInstance(memory, vCores)); + req.setPriority(Priority.newInstance(priority)); + req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType)); + req.setNumContainers(containers); + return req; + } + + @Test + public void testGPending() throws YarnException, IOException { + // Ask for two containers, one with location preference + this.asks.add( + createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED, + 1)); + this.asks.add( + createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED, + 1)); + this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 2)); + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + + // Ask from the uam + this.uamRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + + // Update the any to ask for an extra container + this.asks.get(2).setNumContainers(3); + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + + // Update the any to ask to pretend a container was allocated + this.asks.get(2).setNumContainers(2); + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + } + + @Test + public void testPromotePending() throws YarnException, IOException { + // Ask to promote 3 containers + this.updates.add(createPromote(1)); + this.updates.add(createPromote(2)); + this.updates.add(createPromote(3)); + + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Promote).value()); + + // Demote 2 containers, one of which is pending promote + this.updates.remove(createPromote(3)); + this.updates.add(createDemote(3)); + this.updates.add(createDemote(4)); + + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Promote).value()); + + // Let the RM respond with two successful promotions, one of which + // was pending promote + List updated = new ArrayList<>(); + updated.add(UpdatedContainer + .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container + .newInstance(createContainerId(2), null, null, null, null, null))); + updated.add(UpdatedContainer + .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container + .newInstance(createContainerId(5), null, null, null, null, null))); + this.mockAMS.response.setUpdatedContainers(updated); + + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(1, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Promote).value()); + + // Remove the promoted container and clean up response + this.mockAMS.response.getUpdatedContainers().clear(); + this.updates.remove(createPromote(2)); + + // Let the RM respond with two completed containers, one of which was + // pending promote + List completed = new ArrayList<>(); + completed + .add(ContainerStatus.newInstance(createContainerId(1), null, "", 0)); + completed + .add(ContainerStatus.newInstance(createContainerId(6), null, "", 0)); + this.mockAMS.response.setCompletedContainersStatuses(completed); + + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Promote).value()); + } + + @Test + public void testCleanUpOnFinish() throws YarnException, IOException { + // Ask for two containers, one with location preference + this.asks.add( + createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED, + 1)); + this.asks.add( + createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED, + 1)); + this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 2)); + + // Ask to promote 3 containers + this.updates.add(createPromote(1)); + this.updates.add(createPromote(2)); + this.updates.add(createPromote(3)); + + // Run the allocate call to start tracking pending + this.homeRelayer.allocate(getAllocateRequest()); + + // After finish, the metrics should reset to zero + this.homeRelayer.shutdown(); + + Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Promote).value()); + } + + @Test + public void testFailover() throws YarnException, IOException { + // Ask for two containers, one with location preference + this.asks.add( + createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED, + 1)); + this.asks.add( + createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED, + 1)); + this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 2)); + + long previousSuccess = AMRMClientRelayerMetrics.getInstance() + .getHeartbeatSuccessMetric(homeID).value(); + long previousFailover = AMRMClientRelayerMetrics.getInstance() + .getRMMasterSlaveSwitchMetric(homeID).value(); + // Set failover to trigger + mockAMS.failover = true; + this.homeRelayer.allocate(getAllocateRequest()); + // The failover metric should be incremented + Assert.assertEquals(++previousFailover, + AMRMClientRelayerMetrics.getInstance() + .getRMMasterSlaveSwitchMetric(homeID).value()); + + // The success metric should be incremented once + Assert.assertEquals(++previousSuccess, + AMRMClientRelayerMetrics.getInstance() + .getHeartbeatSuccessMetric(homeID).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + + // Ask from the uam + this.uamRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + + // Update the any to ask for an extra container + this.asks.get(2).setNumContainers(3); + mockAMS.failover = true; + this.homeRelayer.allocate(getAllocateRequest()); + // The failover metric should be incremented + Assert.assertEquals(++previousFailover, + AMRMClientRelayerMetrics.getInstance() + .getRMMasterSlaveSwitchMetric(homeID).value()); + + // The success metric should be incremented once + Assert.assertEquals(++previousSuccess, + AMRMClientRelayerMetrics.getInstance() + .getHeartbeatSuccessMetric(homeID).value()); + + Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + + // Update the any to ask to pretend a container was allocated + this.asks.get(2).setNumContainers(2); + mockAMS.failover = true; + this.homeRelayer.allocate(getAllocateRequest()); + // The failover metric should be incremented + Assert.assertEquals(++previousFailover, + AMRMClientRelayerMetrics.getInstance() + .getRMMasterSlaveSwitchMetric(homeID).value()); + + // The success metric should be incremented once + Assert.assertEquals(++previousSuccess, + AMRMClientRelayerMetrics.getInstance() + .getHeartbeatSuccessMetric(homeID).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + + long previousFailure = AMRMClientRelayerMetrics.getInstance() + .getHeartbeatFailureMetric(homeID).value(); + + mockAMS.exception = true; + try{ + this.homeRelayer.allocate(getAllocateRequest()); + Assert.fail(); + } catch (YarnException e){ + } + // The failover metric should not be incremented + Assert.assertEquals(previousFailover, + AMRMClientRelayerMetrics.getInstance() + .getRMMasterSlaveSwitchMetric(homeID).value()); + + // The success metric should not be incremented + Assert.assertEquals(previousSuccess, + AMRMClientRelayerMetrics.getInstance() + .getHeartbeatSuccessMetric(homeID).value()); + + // The failure metric should be incremented + Assert.assertEquals(++previousFailure, + AMRMClientRelayerMetrics.getInstance() + .getHeartbeatFailureMetric(homeID).value()); + + mockAMS.failover = true; + mockAMS.exception = true; + try{ + this.homeRelayer.allocate(getAllocateRequest()); + Assert.fail(); + } catch (YarnException e){ + } + // The failover metric should be incremented + Assert.assertEquals(++previousFailover, + AMRMClientRelayerMetrics.getInstance() + .getRMMasterSlaveSwitchMetric(homeID).value()); + + // The success metric should not be incremented + Assert.assertEquals(previousSuccess, + AMRMClientRelayerMetrics.getInstance() + .getHeartbeatSuccessMetric(homeID).value()); + + // The failure metric should be incremented + Assert.assertEquals(++previousFailure, + AMRMClientRelayerMetrics.getInstance() + .getHeartbeatFailureMetric(homeID).value()); + } + + @Test + public void testNewEmptyRequest() + throws YarnException, IOException { + // Ask for zero containers + this.asks.add(createResourceRequest(1, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 0)); + this.homeRelayer.allocate(getAllocateRequest()); + + Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(homeID, RequestType.Guaranteed).value()); + + Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance() + .getPendingMetric(uamID, RequestType.Guaranteed).value()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 5848d3f8b74..b6bb0da243e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -374,7 +374,7 @@ public class TestUnmanagedApplicationManager { ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { super(conf, appId, queueName, submitter, appNameSuffix, - keepContainersAcrossApplicationAttempts); + keepContainersAcrossApplicationAttempts, "TEST"); } @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index eb818f1c69e..1bf882ffc41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -254,7 +254,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, - ApplicationMasterProtocol.class, this.appOwner), appId); + ApplicationMasterProtocol.class, this.appOwner), appId, + this.homeSubClusterId.toString()); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -340,7 +341,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.attemptId.getApplicationId(), this.amRegistrationResponse.getQueue(), getApplicationContext().getUser(), this.homeSubClusterId.getId(), - entry.getValue()); + entry.getValue(), subClusterId.toString()); this.secondaryRelayers.put(subClusterId.getId(), this.uamPool.getAMRMClientRelayer(subClusterId.getId())); @@ -666,7 +667,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { uamPool.finishApplicationMaster(subClusterId, finishRequest); if (uamResponse.getIsUnregistered()) { - secondaryRelayers.remove(subClusterId); + AMRMClientRelayer relayer = + secondaryRelayers.remove(subClusterId); + if(relayer != null) { + relayer.shutdown(); + } if (getNMStateStore() != null) { getNMStateStore().removeAMRMProxyAppContextEntry(attemptId, @@ -753,6 +758,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } this.threadpool = null; } + homeRMRelayer.shutdown(); + for(AMRMClientRelayer relayer : secondaryRelayers.values()){ + relayer.shutdown(); + } super.shutdown(); } @@ -885,7 +894,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { uamPool.reAttachUAM(subClusterId.getId(), config, appId, amRegistrationResponse.getQueue(), getApplicationContext().getUser(), homeSubClusterId.getId(), - amrmToken); + amrmToken, subClusterId.toString()); secondaryRelayers.put(subClusterId.getId(), uamPool.getAMRMClientRelayer(subClusterId.getId())); @@ -1136,7 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { token = uamPool.launchUAM(subClusterId, config, attemptId.getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString(), true); + homeSubClusterId.toString(), true, subClusterId); secondaryRelayers.put(subClusterId, uamPool.getAMRMClientRelayer(subClusterId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 1088c698619..33617d41558 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -122,7 +122,8 @@ public class TestableFederationInterceptor extends FederationInterceptor { @Override public UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, + String rmId) { return new TestableUnmanagedApplicationManager(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); } @@ -139,7 +140,7 @@ public class TestableFederationInterceptor extends FederationInterceptor { ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { super(conf, appId, queueName, submitter, appNameSuffix, - keepContainersAcrossApplicationAttempts); + keepContainersAcrossApplicationAttempts, "TEST"); } /**