YARN-8658. [AMRMProxy] Metrics for AMRMClientRelayer inside FederationInterceptor. Contributed by Young Chen.
This commit is contained in:
parent
64c7a12b57
commit
02b9bfdf9e
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||||
|
@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
|
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
|
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -98,6 +100,15 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
private Set<ResourceRequest> ask =
|
private Set<ResourceRequest> ask =
|
||||||
new TreeSet<>(new ResourceRequest.ResourceRequestComparator());
|
new TreeSet<>(new ResourceRequest.ResourceRequestComparator());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data structures for pending and allocate latency metrics. This only applies
|
||||||
|
* for requests with non-zero allocationRequestId.
|
||||||
|
*/
|
||||||
|
private Map<Long, Integer> pendingCountForMetrics = new HashMap<>();
|
||||||
|
private Map<Long, Long> askTimeStamp = new HashMap<>();
|
||||||
|
// List of allocated containerId to avoid double counting
|
||||||
|
private Set<ContainerId> knownContainers = new HashSet<>();
|
||||||
|
|
||||||
private Set<ContainerId> remotePendingRelease = new HashSet<>();
|
private Set<ContainerId> remotePendingRelease = new HashSet<>();
|
||||||
private Set<ContainerId> release = new HashSet<>();
|
private Set<ContainerId> release = new HashSet<>();
|
||||||
|
|
||||||
|
@ -108,6 +119,7 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
private Map<ContainerId, UpdateContainerRequest> remotePendingChange =
|
private Map<ContainerId, UpdateContainerRequest> remotePendingChange =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
|
private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
|
||||||
|
private Map<ContainerId, Long> changeTimeStamp = new HashMap<>();
|
||||||
|
|
||||||
private Map<Set<String>, List<SchedulingRequest>> remotePendingSchedRequest =
|
private Map<Set<String>, List<SchedulingRequest>> remotePendingSchedRequest =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
|
@ -119,16 +131,26 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
// heartbeat
|
// heartbeat
|
||||||
private volatile int resetResponseId;
|
private volatile int resetResponseId;
|
||||||
|
|
||||||
|
private String rmId = "";
|
||||||
|
private volatile boolean shutdown = false;
|
||||||
|
|
||||||
|
private AMRMClientRelayerMetrics metrics;
|
||||||
|
|
||||||
public AMRMClientRelayer() {
|
public AMRMClientRelayer() {
|
||||||
super(AMRMClientRelayer.class.getName());
|
super(AMRMClientRelayer.class.getName());
|
||||||
this.resetResponseId = -1;
|
this.resetResponseId = -1;
|
||||||
|
this.metrics = AMRMClientRelayerMetrics.getInstance();
|
||||||
|
this.rmClient = null;
|
||||||
|
this.appId = null;
|
||||||
|
this.rmId = "";
|
||||||
}
|
}
|
||||||
|
|
||||||
public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
|
public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
|
||||||
ApplicationId appId) {
|
ApplicationId appId, String rmId) {
|
||||||
this();
|
this();
|
||||||
this.rmClient = rmClient;
|
this.rmClient = rmClient;
|
||||||
this.appId = appId;
|
this.appId = appId;
|
||||||
|
this.rmId = rmId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -155,6 +177,7 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
if (this.rmClient != null) {
|
if (this.rmClient != null) {
|
||||||
RPC.stopProxy(this.rmClient);
|
RPC.stopProxy(this.rmClient);
|
||||||
}
|
}
|
||||||
|
shutdown();
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,6 +186,49 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
this.amRegistrationRequest = registerRequest;
|
this.amRegistrationRequest = registerRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRMClient(ApplicationMasterProtocol client){
|
||||||
|
this.rmClient = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
// On finish, clear out our pending count from the metrics
|
||||||
|
// and set the shut down flag so no more pending requests get
|
||||||
|
// added
|
||||||
|
synchronized (this) {
|
||||||
|
if (this.shutdown) {
|
||||||
|
LOG.warn(
|
||||||
|
"Shutdown called twice for AMRMClientRelayer for RM " + this.rmId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.shutdown = true;
|
||||||
|
for (Map.Entry<ResourceRequestSetKey, ResourceRequestSet> entry
|
||||||
|
: this.remotePendingAsks .entrySet()) {
|
||||||
|
ResourceRequestSetKey key = entry.getKey();
|
||||||
|
if (key.getAllocationRequestId() == 0) {
|
||||||
|
this.metrics.decrClientPending(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
|
||||||
|
entry.getValue().getNumContainers());
|
||||||
|
} else {
|
||||||
|
this.askTimeStamp.remove(key.getAllocationRequestId());
|
||||||
|
Integer pending =
|
||||||
|
this.pendingCountForMetrics.remove(key.getAllocationRequestId());
|
||||||
|
if (pending == null) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"pendingCountForMetrics not found for key " + key
|
||||||
|
+ " during shutdown");
|
||||||
|
}
|
||||||
|
this.metrics.decrClientPending(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
|
||||||
|
pending);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for(UpdateContainerRequest req : remotePendingChange.values()) {
|
||||||
|
this.metrics
|
||||||
|
.decrClientPending(rmId, req.getContainerUpdateType(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
RegisterApplicationMasterRequest request)
|
RegisterApplicationMasterRequest request)
|
||||||
|
@ -178,7 +244,8 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
try {
|
try {
|
||||||
return this.rmClient.finishApplicationMaster(request);
|
return this.rmClient.finishApplicationMaster(request);
|
||||||
} catch (ApplicationMasterNotRegisteredException e) {
|
} catch (ApplicationMasterNotRegisteredException e) {
|
||||||
LOG.warn("Out of sync with RM for " + this.appId + ", hence resyncing.");
|
LOG.warn("Out of sync with RM " + rmId
|
||||||
|
+ " for " + this.appId + ", hence resyncing.");
|
||||||
// re register with RM
|
// re register with RM
|
||||||
registerApplicationMaster(this.amRegistrationRequest);
|
registerApplicationMaster(this.amRegistrationRequest);
|
||||||
return finishApplicationMaster(request);
|
return finishApplicationMaster(request);
|
||||||
|
@ -215,7 +282,23 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
if (allocateRequest.getUpdateRequests() != null) {
|
if (allocateRequest.getUpdateRequests() != null) {
|
||||||
for (UpdateContainerRequest update : allocateRequest
|
for (UpdateContainerRequest update : allocateRequest
|
||||||
.getUpdateRequests()) {
|
.getUpdateRequests()) {
|
||||||
this.remotePendingChange.put(update.getContainerId(), update);
|
UpdateContainerRequest req =
|
||||||
|
this.remotePendingChange.put(update.getContainerId(), update);
|
||||||
|
this.changeTimeStamp
|
||||||
|
.put(update.getContainerId(), System.currentTimeMillis());
|
||||||
|
if (req == null) {
|
||||||
|
// If this is a brand new request, all we have to do is increment
|
||||||
|
this.metrics
|
||||||
|
.incrClientPending(rmId, update.getContainerUpdateType(), 1);
|
||||||
|
} else if (req.getContainerUpdateType() != update
|
||||||
|
.getContainerUpdateType()) {
|
||||||
|
// If this is replacing a request with a different update type, we
|
||||||
|
// need to decrement the replaced type
|
||||||
|
this.metrics
|
||||||
|
.decrClientPending(rmId, req.getContainerUpdateType(), 1);
|
||||||
|
this.metrics
|
||||||
|
.incrClientPending(rmId, update.getContainerUpdateType(), 1);
|
||||||
|
}
|
||||||
this.change.put(update.getContainerId(), update);
|
this.change.put(update.getContainerId(), update);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,141 +315,196 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
public AllocateResponse allocate(AllocateRequest allocateRequest)
|
public AllocateResponse allocate(AllocateRequest allocateRequest)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
AllocateResponse allocateResponse = null;
|
AllocateResponse allocateResponse = null;
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
synchronized (this) {
|
||||||
|
if(this.shutdown){
|
||||||
|
throw new YarnException("Allocate called after AMRMClientRelayer for "
|
||||||
|
+ "RM " + rmId + " shutdown.");
|
||||||
|
}
|
||||||
|
addNewAllocateRequest(allocateRequest);
|
||||||
|
|
||||||
|
ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
|
||||||
|
for (ResourceRequest r : ask) {
|
||||||
|
// create a copy of ResourceRequest as we might change it while the
|
||||||
|
// RPC layer is using it to send info across
|
||||||
|
askList.add(ResourceRequest.clone(r));
|
||||||
|
}
|
||||||
|
|
||||||
|
allocateRequest = AllocateRequest.newBuilder()
|
||||||
|
.responseId(allocateRequest.getResponseId())
|
||||||
|
.progress(allocateRequest.getProgress()).askList(askList)
|
||||||
|
.releaseList(new ArrayList<>(this.release))
|
||||||
|
.resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
|
||||||
|
new ArrayList<>(this.blacklistAdditions),
|
||||||
|
new ArrayList<>(this.blacklistRemovals)))
|
||||||
|
.updateRequests(new ArrayList<>(this.change.values()))
|
||||||
|
.schedulingRequests(new ArrayList<>(this.schedulingRequest))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
if (this.resetResponseId != -1) {
|
||||||
|
LOG.info("Override allocate responseId from "
|
||||||
|
+ allocateRequest.getResponseId() + " to " + this.resetResponseId
|
||||||
|
+ " for " + this.appId);
|
||||||
|
allocateRequest.setResponseId(this.resetResponseId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the actual allocate call
|
||||||
try {
|
try {
|
||||||
|
allocateResponse = this.rmClient.allocate(allocateRequest);
|
||||||
|
|
||||||
|
// Heartbeat succeeded, wipe out responseId overriding
|
||||||
|
this.resetResponseId = -1;
|
||||||
|
} catch (ApplicationMasterNotRegisteredException e) {
|
||||||
|
// This is a retriable exception - we will re register and mke a
|
||||||
|
// recursive call to retry
|
||||||
|
LOG.warn("ApplicationMaster is out of sync with RM " + rmId
|
||||||
|
+ " for " + this.appId + ", hence resyncing.");
|
||||||
|
|
||||||
|
this.metrics.incrRMMasterSlaveSwitch(this.rmId);
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
addNewAllocateRequest(allocateRequest);
|
// Add all remotePending data into to-send data structures
|
||||||
|
for (ResourceRequestSet requestSet : this.remotePendingAsks
|
||||||
ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
|
.values()) {
|
||||||
for (ResourceRequest r : ask) {
|
for (ResourceRequest rr : requestSet.getRRs()) {
|
||||||
// create a copy of ResourceRequest as we might change it while the
|
addResourceRequestToAsk(rr);
|
||||||
// RPC layer is using it to send info across
|
}
|
||||||
askList.add(ResourceRequest.clone(r));
|
|
||||||
}
|
}
|
||||||
|
this.release.addAll(this.remotePendingRelease);
|
||||||
allocateRequest = AllocateRequest.newBuilder()
|
this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
|
||||||
.responseId(allocateRequest.getResponseId())
|
this.change.putAll(this.remotePendingChange);
|
||||||
.progress(allocateRequest.getProgress()).askList(askList)
|
for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
|
||||||
.releaseList(new ArrayList<>(this.release))
|
.values()) {
|
||||||
.resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
|
this.schedulingRequest.addAll(reqs);
|
||||||
new ArrayList<>(this.blacklistAdditions),
|
|
||||||
new ArrayList<>(this.blacklistRemovals)))
|
|
||||||
.updateRequests(new ArrayList<>(this.change.values()))
|
|
||||||
.schedulingRequests(new ArrayList<>(this.schedulingRequest))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
if (this.resetResponseId != -1) {
|
|
||||||
LOG.info("Override allocate responseId from "
|
|
||||||
+ allocateRequest.getResponseId() + " to " + this.resetResponseId
|
|
||||||
+ " for " + this.appId);
|
|
||||||
allocateRequest.setResponseId(this.resetResponseId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do the actual allocate call
|
// re-register with RM, then retry allocate recursively
|
||||||
try {
|
registerApplicationMaster(this.amRegistrationRequest);
|
||||||
allocateResponse = this.rmClient.allocate(allocateRequest);
|
// Reset responseId after re-register
|
||||||
|
allocateRequest.setResponseId(0);
|
||||||
|
allocateResponse = allocate(allocateRequest);
|
||||||
|
return allocateResponse;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
// Unexpected exception - rethrow and increment heart beat failure metric
|
||||||
|
this.metrics.addHeartbeatFailure(this.rmId,
|
||||||
|
System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
// Heartbeat succeeded, wipe out responseId overriding
|
// If RM is complaining about responseId out of sync, force reset next
|
||||||
this.resetResponseId = -1;
|
// time
|
||||||
} catch (ApplicationMasterNotRegisteredException e) {
|
if (t instanceof InvalidApplicationMasterRequestException) {
|
||||||
LOG.warn("ApplicationMaster is out of sync with RM for " + this.appId
|
int responseId = AMRMClientUtils
|
||||||
+ " hence resyncing.");
|
.parseExpectedResponseIdFromException(t.getMessage());
|
||||||
|
if (responseId != -1) {
|
||||||
|
this.resetResponseId = responseId;
|
||||||
|
LOG.info("ResponseId out of sync with RM, expect " + responseId
|
||||||
|
+ " but " + allocateRequest.getResponseId() + " used by "
|
||||||
|
+ this.appId + ". Will override in the next allocate.");
|
||||||
|
} else {
|
||||||
|
LOG.warn("Failed to parse expected responseId out of exception for "
|
||||||
|
+ this.appId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (this) {
|
throw t;
|
||||||
// Add all remotePending data into to-send data structures
|
}
|
||||||
for (ResourceRequestSet requestSet : this.remotePendingAsks
|
|
||||||
.values()) {
|
synchronized (this) {
|
||||||
for (ResourceRequest rr : requestSet.getRRs()) {
|
if (this.shutdown) {
|
||||||
addResourceRequestToAsk(rr);
|
throw new YarnException("Allocate call succeeded for " + this.appId
|
||||||
|
+ " after AMRMClientRelayer for RM " + rmId + " shutdown.");
|
||||||
|
}
|
||||||
|
|
||||||
|
updateMetrics(allocateResponse, startTime);
|
||||||
|
|
||||||
|
AMRMClientUtils.removeFromOutstandingSchedulingRequests(
|
||||||
|
allocateResponse.getAllocatedContainers(),
|
||||||
|
this.remotePendingSchedRequest);
|
||||||
|
AMRMClientUtils.removeFromOutstandingSchedulingRequests(
|
||||||
|
allocateResponse.getContainersFromPreviousAttempts(),
|
||||||
|
this.remotePendingSchedRequest);
|
||||||
|
|
||||||
|
this.ask.clear();
|
||||||
|
this.release.clear();
|
||||||
|
|
||||||
|
this.blacklistAdditions.clear();
|
||||||
|
this.blacklistRemovals.clear();
|
||||||
|
|
||||||
|
this.change.clear();
|
||||||
|
this.schedulingRequest.clear();
|
||||||
|
return allocateResponse;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateMetrics(AllocateResponse allocateResponse,
|
||||||
|
long startTime) {
|
||||||
|
this.metrics.addHeartbeatSuccess(this.rmId,
|
||||||
|
System.currentTimeMillis() - startTime);
|
||||||
|
// Process the allocate response from RM
|
||||||
|
if (allocateResponse.getAllocatedContainers() != null) {
|
||||||
|
for (Container container : allocateResponse
|
||||||
|
.getAllocatedContainers()) {
|
||||||
|
// Do not update metrics aggressively for AllocationRequestId zero
|
||||||
|
// case. Also avoid double count to due to re-send
|
||||||
|
if (this.knownContainers.add(container.getId())) {
|
||||||
|
this.metrics.addFulfilledQPS(this.rmId, AMRMClientRelayerMetrics
|
||||||
|
.getRequestType(container.getExecutionType()), 1);
|
||||||
|
if (container.getAllocationRequestId() != 0) {
|
||||||
|
Integer count = this.pendingCountForMetrics
|
||||||
|
.get(container.getAllocationRequestId());
|
||||||
|
if (count != null && count > 0) {
|
||||||
|
this.pendingCountForMetrics
|
||||||
|
.put(container.getAllocationRequestId(), --count);
|
||||||
|
this.metrics.decrClientPending(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics
|
||||||
|
.getRequestType(container.getExecutionType()), 1);
|
||||||
|
this.metrics.addFulfillLatency(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics
|
||||||
|
.getRequestType(container.getExecutionType()),
|
||||||
|
System.currentTimeMillis() - this.askTimeStamp
|
||||||
|
.get(container.getAllocationRequestId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.release.addAll(this.remotePendingRelease);
|
|
||||||
this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
|
|
||||||
this.change.putAll(this.remotePendingChange);
|
|
||||||
for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
|
|
||||||
.values()) {
|
|
||||||
this.schedulingRequest.addAll(reqs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// re-register with RM, then retry allocate recursively
|
|
||||||
registerApplicationMaster(this.amRegistrationRequest);
|
|
||||||
// Reset responseId after re-register
|
|
||||||
allocateRequest.setResponseId(0);
|
|
||||||
return allocate(allocateRequest);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
|
|
||||||
// If RM is complaining about responseId out of sync, force reset next
|
|
||||||
// time
|
|
||||||
if (t instanceof InvalidApplicationMasterRequestException) {
|
|
||||||
int responseId = AMRMClientUtils
|
|
||||||
.parseExpectedResponseIdFromException(t.getMessage());
|
|
||||||
if (responseId != -1) {
|
|
||||||
this.resetResponseId = responseId;
|
|
||||||
LOG.info("ResponseId out of sync with RM, expect " + responseId
|
|
||||||
+ " but " + allocateRequest.getResponseId() + " used by "
|
|
||||||
+ this.appId + ". Will override in the next allocate.");
|
|
||||||
} else {
|
|
||||||
LOG.warn("Failed to parse expected responseId out of exception for "
|
|
||||||
+ this.appId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (this) {
|
|
||||||
// Process the allocate response from RM
|
|
||||||
if (allocateResponse.getCompletedContainersStatuses() != null) {
|
|
||||||
for (ContainerStatus container : allocateResponse
|
|
||||||
.getCompletedContainersStatuses()) {
|
|
||||||
this.remotePendingRelease.remove(container.getContainerId());
|
|
||||||
this.remotePendingChange.remove(container.getContainerId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (allocateResponse.getUpdatedContainers() != null) {
|
|
||||||
for (UpdatedContainer updatedContainer : allocateResponse
|
|
||||||
.getUpdatedContainers()) {
|
|
||||||
this.remotePendingChange
|
|
||||||
.remove(updatedContainer.getContainer().getId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
AMRMClientUtils.removeFromOutstandingSchedulingRequests(
|
|
||||||
allocateResponse.getAllocatedContainers(),
|
|
||||||
this.remotePendingSchedRequest);
|
|
||||||
AMRMClientUtils.removeFromOutstandingSchedulingRequests(
|
|
||||||
allocateResponse.getContainersFromPreviousAttempts(),
|
|
||||||
this.remotePendingSchedRequest);
|
|
||||||
}
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
synchronized (this) {
|
|
||||||
/*
|
|
||||||
* If allocateResponse is null, it means exception happened and RM did
|
|
||||||
* not accept the request. Don't clear any data structures so that they
|
|
||||||
* will be re-sent next time.
|
|
||||||
*
|
|
||||||
* Otherwise request was accepted by RM, we are safe to clear these.
|
|
||||||
*/
|
|
||||||
if (allocateResponse != null) {
|
|
||||||
this.ask.clear();
|
|
||||||
this.release.clear();
|
|
||||||
|
|
||||||
this.blacklistAdditions.clear();
|
|
||||||
this.blacklistRemovals.clear();
|
|
||||||
|
|
||||||
this.change.clear();
|
|
||||||
this.schedulingRequest.clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return allocateResponse;
|
if (allocateResponse.getCompletedContainersStatuses() != null) {
|
||||||
|
for (ContainerStatus container : allocateResponse
|
||||||
|
.getCompletedContainersStatuses()) {
|
||||||
|
this.remotePendingRelease.remove(container.getContainerId());
|
||||||
|
UpdateContainerRequest req =
|
||||||
|
this.remotePendingChange.remove(container.getContainerId());
|
||||||
|
if (req != null) {
|
||||||
|
this.metrics
|
||||||
|
.decrClientPending(rmId, req.getContainerUpdateType(), 1);
|
||||||
|
}
|
||||||
|
this.knownContainers.remove(container.getContainerId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allocateResponse.getUpdatedContainers() != null) {
|
||||||
|
for (UpdatedContainer updatedContainer : allocateResponse
|
||||||
|
.getUpdatedContainers()) {
|
||||||
|
UpdateContainerRequest req = this.remotePendingChange
|
||||||
|
.remove(updatedContainer.getContainer().getId());
|
||||||
|
if (req != null) {
|
||||||
|
this.metrics
|
||||||
|
.decrClientPending(rmId, req.getContainerUpdateType(), 1);
|
||||||
|
this.metrics.addFulfillLatency(rmId, req.getContainerUpdateType(),
|
||||||
|
System.currentTimeMillis() - this.changeTimeStamp
|
||||||
|
.remove(req.getContainerId()));
|
||||||
|
this.metrics
|
||||||
|
.addFulfilledQPS(rmId, req.getContainerUpdateType(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addNewAsks(List<ResourceRequest> asks) throws YarnException {
|
private void addNewAsks(List<ResourceRequest> asks) throws YarnException {
|
||||||
Set<ResourceRequestSetKey> touchedKeys = new HashSet<>();
|
Set<ResourceRequestSetKey> touchedKeys = new HashSet<>();
|
||||||
|
Set<ResourceRequestSetKey> nonZeroNewKeys = new HashSet<>();
|
||||||
for (ResourceRequest rr : asks) {
|
for (ResourceRequest rr : asks) {
|
||||||
addResourceRequestToAsk(rr);
|
addResourceRequestToAsk(rr);
|
||||||
|
|
||||||
|
@ -377,8 +515,38 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
if (askSet == null) {
|
if (askSet == null) {
|
||||||
askSet = new ResourceRequestSet(key);
|
askSet = new ResourceRequestSet(key);
|
||||||
this.remotePendingAsks.put(key, askSet);
|
this.remotePendingAsks.put(key, askSet);
|
||||||
|
if (key.getAllocationRequestId() != 0) {
|
||||||
|
nonZeroNewKeys.add(key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int numContainers = askSet.getNumContainers();
|
||||||
askSet.addAndOverrideRR(rr);
|
askSet.addAndOverrideRR(rr);
|
||||||
|
int deltaContainers = askSet.getNumContainers() - numContainers;
|
||||||
|
|
||||||
|
if (key.getAllocationRequestId() == 0) {
|
||||||
|
// AllocationRequestId is zero, keep track of pending count in the
|
||||||
|
// delayed but correct way. Allocation latency is not supported
|
||||||
|
if (deltaContainers != 0) {
|
||||||
|
this.metrics.incrClientPending(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
|
||||||
|
deltaContainers);
|
||||||
|
if(deltaContainers > 0){
|
||||||
|
this.metrics.addRequestedQPS(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics.getRequestType(key.getExeType()),
|
||||||
|
deltaContainers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// AllocationRequestId is non-zero, we do pending decrement and latency
|
||||||
|
// aggressively. So don't update metrics here. Double check AM is not
|
||||||
|
// reusing the requestId for more asks
|
||||||
|
if (deltaContainers > 0 && numContainers != 0) {
|
||||||
|
throw new YarnException("Received new ask ("
|
||||||
|
+ askSet.getNumContainers() + ") on top of existing ("
|
||||||
|
+ numContainers + ") in key " + key);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup properly if needed
|
// Cleanup properly if needed
|
||||||
|
@ -391,6 +559,20 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
askSet.cleanupZeroNonAnyRR();
|
askSet.cleanupZeroNonAnyRR();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize data for pending metrics for each new key
|
||||||
|
for (ResourceRequestSetKey key : nonZeroNewKeys) {
|
||||||
|
if(remotePendingAsks.containsKey(key)){
|
||||||
|
this.askTimeStamp.put(key.getAllocationRequestId(),
|
||||||
|
System.currentTimeMillis());
|
||||||
|
int count = this.remotePendingAsks.get(key).getNumContainers();
|
||||||
|
this.pendingCountForMetrics.put(key.getAllocationRequestId(), count);
|
||||||
|
this.metrics.incrClientPending(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count);
|
||||||
|
this.metrics.addRequestedQPS(this.rmId,
|
||||||
|
AMRMClientRelayerMetrics.getRequestType(key.getExeType()), count);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
||||||
|
|
|
@ -0,0 +1,368 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.metrics;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics for FederationInterceptor Internals.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@Metrics(about = "Performance and usage metrics for YARN AMRMClientRelayer",
|
||||||
|
context = "fedr")
|
||||||
|
public final class AMRMClientRelayerMetrics implements MetricsSource{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Easier classification of request types for logging metrics.
|
||||||
|
*/
|
||||||
|
public enum RequestType {
|
||||||
|
Guaranteed, Opportunistic, Promote, Demote;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
switch (this) {
|
||||||
|
case Guaranteed:
|
||||||
|
return "G";
|
||||||
|
case Opportunistic:
|
||||||
|
return "O";
|
||||||
|
case Promote:
|
||||||
|
return "P";
|
||||||
|
case Demote:
|
||||||
|
return "D";
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private static final MetricsInfo RECORD_INFO =
|
||||||
|
info("AMRMClientRelayerMetrics",
|
||||||
|
"Metrics for the Yarn AMRMClientRelayer");
|
||||||
|
|
||||||
|
private static volatile AMRMClientRelayerMetrics instance = null;
|
||||||
|
private static MetricsRegistry registry;
|
||||||
|
|
||||||
|
// The metrics are set up as a map from string (typically sub cluster id) to
|
||||||
|
// request type (Guaranteed, Opp, Promote, Demote) to the counter.
|
||||||
|
// The counters are constructed lazily when the first metric entry
|
||||||
|
// comes in.
|
||||||
|
// For some metrics, request type is not applicable.
|
||||||
|
private final Map<String, Map<RequestType, MutableGaugeLong>>
|
||||||
|
rmClientPending = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final Map<String, Map<RequestType, MutableQuantiles>> fulfillLatency =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final Map<String, Map<RequestType, MutableGaugeLong>>
|
||||||
|
requestedQps = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final Map<String, Map<RequestType, MutableGaugeLong>>
|
||||||
|
fulfilledQps = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final Map<String, MutableGaugeLong> rmMasterSlaveSwitch =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final Map<String, MutableGaugeLong> heartbeatFailure =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final Map<String, MutableGaugeLong> heartbeatSuccess =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
private final Map<String, MutableQuantiles> heartbeatLatency =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the singleton instance.
|
||||||
|
*
|
||||||
|
* @return the singleton
|
||||||
|
*/
|
||||||
|
public static AMRMClientRelayerMetrics getInstance() {
|
||||||
|
if (!isInitialized.get()) {
|
||||||
|
synchronized (AMRMClientRelayerMetrics.class) {
|
||||||
|
if (instance == null) {
|
||||||
|
instance = new AMRMClientRelayerMetrics();
|
||||||
|
DefaultMetricsSystem.instance().register(RECORD_INFO.name(),
|
||||||
|
RECORD_INFO.description(), instance);
|
||||||
|
isInitialized.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AMRMClientRelayerMetrics() {
|
||||||
|
registry = new MetricsRegistry(RECORD_INFO);
|
||||||
|
registry.tag(RECORD_INFO, "AMRMClientRelayer");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RequestType getRequestType(ExecutionType execType) {
|
||||||
|
if (execType == null || execType.equals(ExecutionType.GUARANTEED)) {
|
||||||
|
return RequestType.Guaranteed;
|
||||||
|
}
|
||||||
|
return RequestType.Opportunistic;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableGaugeLong getPendingMetric(String instanceId,
|
||||||
|
RequestType type) {
|
||||||
|
synchronized (rmClientPending) {
|
||||||
|
if (!rmClientPending.containsKey(instanceId)) {
|
||||||
|
rmClientPending.put(instanceId,
|
||||||
|
new ConcurrentHashMap<RequestType, MutableGaugeLong>());
|
||||||
|
}
|
||||||
|
if (!rmClientPending.get(instanceId).containsKey(type)) {
|
||||||
|
rmClientPending.get(instanceId).put(type, registry
|
||||||
|
.newGauge(type.toString() + "Pending" + instanceId,
|
||||||
|
"Remove pending of " + type + " for " + instanceId, 0L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rmClientPending.get(instanceId).get(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrClientPending(String instanceId, RequestType type, int diff) {
|
||||||
|
getPendingMetric(instanceId, type).incr(diff);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void decrClientPending(String instanceId, RequestType type, int diff) {
|
||||||
|
getPendingMetric(instanceId, type).decr(diff);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void setClientPending(String instanceId, RequestType type,
|
||||||
|
int val) {
|
||||||
|
getPendingMetric(instanceId, type).set(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableQuantiles getFulfillLatencyMetric(String instanceId,
|
||||||
|
RequestType type) {
|
||||||
|
synchronized (fulfillLatency) {
|
||||||
|
if (!fulfillLatency.containsKey(instanceId)) {
|
||||||
|
fulfillLatency.put(instanceId,
|
||||||
|
new ConcurrentHashMap<RequestType, MutableQuantiles>());
|
||||||
|
}
|
||||||
|
if (!fulfillLatency.get(instanceId).containsKey(type)) {
|
||||||
|
fulfillLatency.get(instanceId).put(type, registry
|
||||||
|
.newQuantiles(type.toString() + "FulfillLatency" + instanceId,
|
||||||
|
"FulfillLatency of " + type + " for " + instanceId, "ops",
|
||||||
|
"latency", 60));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fulfillLatency.get(instanceId).get(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFulfillLatency(String instanceId, RequestType type,
|
||||||
|
long latency) {
|
||||||
|
getFulfillLatencyMetric(instanceId, type).add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFulfillLatency(String instanceId, ContainerUpdateType type,
|
||||||
|
long latency) {
|
||||||
|
switch(type) {
|
||||||
|
case DEMOTE_EXECUTION_TYPE:
|
||||||
|
addFulfillLatency(instanceId, RequestType.Demote, latency);
|
||||||
|
break;
|
||||||
|
case PROMOTE_EXECUTION_TYPE:
|
||||||
|
addFulfillLatency(instanceId, RequestType.Promote, latency);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableGaugeLong getRequestedQPSMetric(String instanceId,
|
||||||
|
RequestType type) {
|
||||||
|
synchronized (requestedQps) {
|
||||||
|
if (!requestedQps.containsKey(instanceId)) {
|
||||||
|
requestedQps.put(instanceId,
|
||||||
|
new ConcurrentHashMap<RequestType, MutableGaugeLong>());
|
||||||
|
}
|
||||||
|
if (!requestedQps.get(instanceId).containsKey(type)) {
|
||||||
|
requestedQps.get(instanceId)
|
||||||
|
.put(type, registry.newGauge(
|
||||||
|
info(type.toString() + "RequestedOps" + instanceId,
|
||||||
|
"Requested operations of " + type + " for " + instanceId),
|
||||||
|
0L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return requestedQps.get(instanceId).get(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addRequestedQPS(String instanceId, RequestType type,
|
||||||
|
long numEntries) {
|
||||||
|
getRequestedQPSMetric(instanceId, type).incr(numEntries);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableGaugeLong getFulfilledQPSMetric(String instanceId,
|
||||||
|
RequestType type) {
|
||||||
|
synchronized (fulfilledQps) {
|
||||||
|
if (!fulfilledQps.containsKey(instanceId)) {
|
||||||
|
fulfilledQps.put(instanceId,
|
||||||
|
new ConcurrentHashMap<RequestType, MutableGaugeLong>());
|
||||||
|
}
|
||||||
|
if (!fulfilledQps.get(instanceId).containsKey(type)) {
|
||||||
|
fulfilledQps.get(instanceId)
|
||||||
|
.put(type, registry.newGauge(
|
||||||
|
info(type.toString() + "FulfilledOps" + instanceId,
|
||||||
|
"Fulfilled operations of " + type + " for " + instanceId),
|
||||||
|
0L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fulfilledQps.get(instanceId).get(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFulfilledQPS(String instanceId, RequestType type,
|
||||||
|
long numEntries) {
|
||||||
|
getFulfilledQPSMetric(instanceId, type).incr(numEntries);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFulfilledQPS(String instanceId, ContainerUpdateType type,
|
||||||
|
long latency) {
|
||||||
|
switch(type) {
|
||||||
|
case DEMOTE_EXECUTION_TYPE:
|
||||||
|
addFulfilledQPS(instanceId, RequestType.Demote, latency);
|
||||||
|
break;
|
||||||
|
case PROMOTE_EXECUTION_TYPE:
|
||||||
|
addFulfilledQPS(instanceId, RequestType.Promote, latency);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrClientPending(String scId, ContainerUpdateType type,
|
||||||
|
int diff) {
|
||||||
|
switch(type) {
|
||||||
|
case DEMOTE_EXECUTION_TYPE:
|
||||||
|
incrClientPending(scId, RequestType.Demote, diff);
|
||||||
|
break;
|
||||||
|
case PROMOTE_EXECUTION_TYPE:
|
||||||
|
incrClientPending(scId, RequestType.Promote, diff);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void decrClientPending(String scId, ContainerUpdateType type,
|
||||||
|
int diff) {
|
||||||
|
switch(type) {
|
||||||
|
case DEMOTE_EXECUTION_TYPE:
|
||||||
|
decrClientPending(scId, RequestType.Demote, diff);
|
||||||
|
break;
|
||||||
|
case PROMOTE_EXECUTION_TYPE:
|
||||||
|
decrClientPending(scId, RequestType.Promote, diff);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableGaugeLong getRMMasterSlaveSwitchMetric(
|
||||||
|
String instanceId) {
|
||||||
|
synchronized (rmMasterSlaveSwitch) {
|
||||||
|
if (!rmMasterSlaveSwitch.containsKey(instanceId)) {
|
||||||
|
rmMasterSlaveSwitch.put(instanceId, registry.newGauge(
|
||||||
|
info("RMMasterSlaveSwitch" + instanceId,
|
||||||
|
"Number of RM master slave switch"), 0L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rmMasterSlaveSwitch.get(instanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrRMMasterSlaveSwitch(String instanceId) {
|
||||||
|
getRMMasterSlaveSwitchMetric(instanceId).incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableQuantiles getHeartbeatLatencyMetric(String instanceId) {
|
||||||
|
synchronized (heartbeatLatency) {
|
||||||
|
if (!heartbeatLatency.containsKey(instanceId)) {
|
||||||
|
heartbeatLatency.put(instanceId, registry
|
||||||
|
.newQuantiles("HeartbeatLatency" + instanceId,
|
||||||
|
"HeartbeatLatency for " + instanceId, "ops", "latency", 60));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return heartbeatLatency.get(instanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableGaugeLong getHeartbeatFailureMetric(
|
||||||
|
String instanceId) {
|
||||||
|
synchronized (heartbeatFailure) {
|
||||||
|
if (!heartbeatFailure.containsKey(instanceId)) {
|
||||||
|
heartbeatFailure.put(instanceId, registry.newGauge(
|
||||||
|
info("HeartbeatFailure" + instanceId,
|
||||||
|
"Number of Heartbeat Failures"), 0L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return heartbeatFailure.get(instanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addHeartbeatFailure(String instanceId, long latency) {
|
||||||
|
getHeartbeatFailureMetric(instanceId).incr();
|
||||||
|
|
||||||
|
getHeartbeatLatencyMetric(instanceId).add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected MutableGaugeLong getHeartbeatSuccessMetric(
|
||||||
|
String instanceId) {
|
||||||
|
synchronized (heartbeatSuccess) {
|
||||||
|
if (!heartbeatSuccess.containsKey(instanceId)) {
|
||||||
|
heartbeatSuccess.put(instanceId, registry.newGauge(
|
||||||
|
info("HeartbeatSuccess" + instanceId,
|
||||||
|
"Number of Heartbeat"), 0L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return heartbeatSuccess.get(instanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addHeartbeatSuccess(String instanceId, long latency) {
|
||||||
|
getHeartbeatSuccessMetric(instanceId).incr();
|
||||||
|
|
||||||
|
getHeartbeatLatencyMetric(instanceId).add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void getMetrics(MetricsCollector builder, boolean all) {
|
||||||
|
registry.snapshot(builder.addRecord(registry.info().name()), all);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.metrics;
|
|
@ -150,6 +150,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
* @param appNameSuffix application name suffix for the UAM
|
* @param appNameSuffix application name suffix for the UAM
|
||||||
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
||||||
* recovery.
|
* recovery.
|
||||||
|
* @param rmName name of the YarnRM
|
||||||
* @see ApplicationSubmissionContext
|
* @see ApplicationSubmissionContext
|
||||||
* #setKeepContainersAcrossApplicationAttempts(boolean)
|
* #setKeepContainersAcrossApplicationAttempts(boolean)
|
||||||
* @return uamId for the UAM
|
* @return uamId for the UAM
|
||||||
|
@ -159,7 +160,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
public String createAndRegisterNewUAM(
|
public String createAndRegisterNewUAM(
|
||||||
RegisterApplicationMasterRequest registerRequest, Configuration conf,
|
RegisterApplicationMasterRequest registerRequest, Configuration conf,
|
||||||
String queueName, String submitter, String appNameSuffix,
|
String queueName, String submitter, String appNameSuffix,
|
||||||
boolean keepContainersAcrossApplicationAttempts)
|
boolean keepContainersAcrossApplicationAttempts, String rmName)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
ApplicationId appId = null;
|
ApplicationId appId = null;
|
||||||
ApplicationClientProtocol rmClient;
|
ApplicationClientProtocol rmClient;
|
||||||
|
@ -183,7 +184,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
|
|
||||||
// Launch the UAM in RM
|
// Launch the UAM in RM
|
||||||
launchUAM(appId.toString(), conf, appId, queueName, submitter,
|
launchUAM(appId.toString(), conf, appId, queueName, submitter,
|
||||||
appNameSuffix, keepContainersAcrossApplicationAttempts);
|
appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
|
||||||
|
|
||||||
// Register the UAM application
|
// Register the UAM application
|
||||||
registerApplicationMaster(appId.toString(), registerRequest);
|
registerApplicationMaster(appId.toString(), registerRequest);
|
||||||
|
@ -203,6 +204,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
* @param appNameSuffix application name suffix for the UAM
|
* @param appNameSuffix application name suffix for the UAM
|
||||||
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
||||||
* recovery.
|
* recovery.
|
||||||
|
* @param rmName name of the YarnRM
|
||||||
* @see ApplicationSubmissionContext
|
* @see ApplicationSubmissionContext
|
||||||
* #setKeepContainersAcrossApplicationAttempts(boolean)
|
* #setKeepContainersAcrossApplicationAttempts(boolean)
|
||||||
* @return UAM token
|
* @return UAM token
|
||||||
|
@ -211,14 +213,15 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
*/
|
*/
|
||||||
public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
|
public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
|
||||||
ApplicationId appId, String queueName, String submitter,
|
ApplicationId appId, String queueName, String submitter,
|
||||||
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts)
|
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
|
||||||
throws YarnException, IOException {
|
String rmName) throws YarnException, IOException {
|
||||||
|
|
||||||
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
|
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
|
||||||
throw new YarnException("UAM " + uamId + " already exists");
|
throw new YarnException("UAM " + uamId + " already exists");
|
||||||
}
|
}
|
||||||
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
|
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
|
||||||
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
|
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
|
||||||
|
rmName);
|
||||||
// Put the UAM into map first before initializing it to avoid additional UAM
|
// Put the UAM into map first before initializing it to avoid additional UAM
|
||||||
// for the same uamId being created concurrently
|
// for the same uamId being created concurrently
|
||||||
this.unmanagedAppMasterMap.put(uamId, uam);
|
this.unmanagedAppMasterMap.put(uamId, uam);
|
||||||
|
@ -248,19 +251,20 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
* @param submitter submitter name of the UAM
|
* @param submitter submitter name of the UAM
|
||||||
* @param appNameSuffix application name suffix for the UAM
|
* @param appNameSuffix application name suffix for the UAM
|
||||||
* @param uamToken UAM token
|
* @param uamToken UAM token
|
||||||
|
* @param rmName name of the YarnRM
|
||||||
* @throws YarnException if fails
|
* @throws YarnException if fails
|
||||||
* @throws IOException if fails
|
* @throws IOException if fails
|
||||||
*/
|
*/
|
||||||
public void reAttachUAM(String uamId, Configuration conf,
|
public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId,
|
||||||
ApplicationId appId, String queueName, String submitter,
|
String queueName, String submitter, String appNameSuffix,
|
||||||
String appNameSuffix, Token<AMRMTokenIdentifier> uamToken)
|
Token<AMRMTokenIdentifier> uamToken, String rmName)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
|
||||||
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
|
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
|
||||||
throw new YarnException("UAM " + uamId + " already exists");
|
throw new YarnException("UAM " + uamId + " already exists");
|
||||||
}
|
}
|
||||||
UnmanagedApplicationManager uam =
|
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
|
||||||
createUAM(conf, appId, queueName, submitter, appNameSuffix, true);
|
submitter, appNameSuffix, true, rmName);
|
||||||
// Put the UAM into map first before initializing it to avoid additional UAM
|
// Put the UAM into map first before initializing it to avoid additional UAM
|
||||||
// for the same uamId being created concurrently
|
// for the same uamId being created concurrently
|
||||||
this.unmanagedAppMasterMap.put(uamId, uam);
|
this.unmanagedAppMasterMap.put(uamId, uam);
|
||||||
|
@ -287,14 +291,16 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
||||||
* @param submitter submitter name of the application
|
* @param submitter submitter name of the application
|
||||||
* @param appNameSuffix application name suffix
|
* @param appNameSuffix application name suffix
|
||||||
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
||||||
|
* @param rmName name of the YarnRM
|
||||||
* @return the UAM instance
|
* @return the UAM instance
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected UnmanagedApplicationManager createUAM(Configuration conf,
|
protected UnmanagedApplicationManager createUAM(Configuration conf,
|
||||||
ApplicationId appId, String queueName, String submitter,
|
ApplicationId appId, String queueName, String submitter,
|
||||||
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
|
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
|
||||||
|
String rmName) {
|
||||||
return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
|
return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
|
||||||
appNameSuffix, keepContainersAcrossApplicationAttempts);
|
appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -116,13 +116,14 @@ public class UnmanagedApplicationManager {
|
||||||
* @param queueName the queue of the UAM
|
* @param queueName the queue of the UAM
|
||||||
* @param submitter user name of the app
|
* @param submitter user name of the app
|
||||||
* @param appNameSuffix the app name suffix to use
|
* @param appNameSuffix the app name suffix to use
|
||||||
|
* @param rmName name of the YarnRM
|
||||||
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
||||||
* recovery. See {@link ApplicationSubmissionContext
|
* recovery. See {@link ApplicationSubmissionContext
|
||||||
* #setKeepContainersAcrossApplicationAttempts(boolean)}
|
* #setKeepContainersAcrossApplicationAttempts(boolean)}
|
||||||
*/
|
*/
|
||||||
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
|
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
|
||||||
String queueName, String submitter, String appNameSuffix,
|
String queueName, String submitter, String appNameSuffix,
|
||||||
boolean keepContainersAcrossApplicationAttempts) {
|
boolean keepContainersAcrossApplicationAttempts, String rmName) {
|
||||||
Preconditions.checkNotNull(conf, "Configuration cannot be null");
|
Preconditions.checkNotNull(conf, "Configuration cannot be null");
|
||||||
Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
|
Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
|
||||||
Preconditions.checkNotNull(submitter, "App submitter cannot be null");
|
Preconditions.checkNotNull(submitter, "App submitter cannot be null");
|
||||||
|
@ -132,9 +133,11 @@ public class UnmanagedApplicationManager {
|
||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
this.submitter = submitter;
|
this.submitter = submitter;
|
||||||
this.appNameSuffix = appNameSuffix;
|
this.appNameSuffix = appNameSuffix;
|
||||||
|
this.userUgi = null;
|
||||||
this.heartbeatHandler =
|
this.heartbeatHandler =
|
||||||
new AMHeartbeatRequestHandler(this.conf, this.applicationId);
|
new AMHeartbeatRequestHandler(this.conf, this.applicationId);
|
||||||
this.rmProxyRelayer = null;
|
this.rmProxyRelayer =
|
||||||
|
new AMRMClientRelayer(null, this.applicationId, rmName);
|
||||||
this.connectionInitiated = false;
|
this.connectionInitiated = false;
|
||||||
this.registerRequest = null;
|
this.registerRequest = null;
|
||||||
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
||||||
|
@ -186,9 +189,8 @@ public class UnmanagedApplicationManager {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.userUgi = UserGroupInformation.createProxyUser(
|
this.userUgi = UserGroupInformation.createProxyUser(
|
||||||
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
|
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
|
||||||
this.rmProxyRelayer =
|
this.rmProxyRelayer.setRMClient(createRMProxy(
|
||||||
new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
|
ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken));
|
||||||
this.conf, this.userUgi, amrmToken), this.applicationId);
|
|
||||||
|
|
||||||
this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
|
this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
|
||||||
this.heartbeatHandler.setUGI(this.userUgi);
|
this.heartbeatHandler.setUGI(this.userUgi);
|
||||||
|
@ -245,7 +247,7 @@ public class UnmanagedApplicationManager {
|
||||||
|
|
||||||
this.heartbeatHandler.shutdown();
|
this.heartbeatHandler.shutdown();
|
||||||
|
|
||||||
if (this.rmProxyRelayer == null) {
|
if (this.userUgi == null) {
|
||||||
if (this.connectionInitiated) {
|
if (this.connectionInitiated) {
|
||||||
// This is possible if the async launchUAM is still
|
// This is possible if the async launchUAM is still
|
||||||
// blocked and retrying. Return a dummy response in this case.
|
// blocked and retrying. Return a dummy response in this case.
|
||||||
|
@ -299,7 +301,7 @@ public class UnmanagedApplicationManager {
|
||||||
//
|
//
|
||||||
// In case 2, we have already save the allocate request above, so if the
|
// In case 2, we have already save the allocate request above, so if the
|
||||||
// registration succeed later, no request is lost.
|
// registration succeed later, no request is lost.
|
||||||
if (this.rmProxyRelayer == null) {
|
if (this.userUgi == null) {
|
||||||
if (this.connectionInitiated) {
|
if (this.connectionInitiated) {
|
||||||
LOG.info("Unmanaged AM still not successfully launched/registered yet."
|
LOG.info("Unmanaged AM still not successfully launched/registered yet."
|
||||||
+ " Saving the allocate request and send later.");
|
+ " Saving the allocate request and send later.");
|
||||||
|
|
|
@ -140,7 +140,7 @@ public class TestAMRMClientRelayer {
|
||||||
this.conf = new Configuration();
|
this.conf = new Configuration();
|
||||||
|
|
||||||
this.mockAMS = new MockApplicationMasterService();
|
this.mockAMS = new MockApplicationMasterService();
|
||||||
this.relayer = new AMRMClientRelayer(this.mockAMS, null);
|
this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST");
|
||||||
|
|
||||||
this.relayer.init(conf);
|
this.relayer.init(conf);
|
||||||
this.relayer.start();
|
this.relayer.start();
|
||||||
|
|
|
@ -0,0 +1,513 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.metrics;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics.RequestType;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test for AMRMClientRelayer.
|
||||||
|
*/
|
||||||
|
public class TestAMRMClientRelayerMetrics {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mock AMS for easier testing and mocking of request/responses.
|
||||||
|
*/
|
||||||
|
public static class MockApplicationMasterService
|
||||||
|
implements ApplicationMasterProtocol {
|
||||||
|
|
||||||
|
private boolean failover = false;
|
||||||
|
private boolean exception = false;
|
||||||
|
private List<ResourceRequest> lastAsk;
|
||||||
|
private List<ContainerId> lastRelease;
|
||||||
|
private List<UpdateContainerRequest> lastUpdates;
|
||||||
|
private List<String> lastBlacklistAdditions;
|
||||||
|
private List<String> lastBlacklistRemovals;
|
||||||
|
private AllocateResponse response = AllocateResponse
|
||||||
|
.newInstance(0, null, null, new ArrayList<NodeReport>(),
|
||||||
|
Resource.newInstance(0, 0), null, 0, null, null);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||||
|
FinishApplicationMasterRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
if (this.failover) {
|
||||||
|
this.failover = false;
|
||||||
|
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AllocateResponse allocate(AllocateRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
if (this.failover) {
|
||||||
|
this.failover = false;
|
||||||
|
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
|
||||||
|
}
|
||||||
|
if(this.exception){
|
||||||
|
this.exception = false;
|
||||||
|
throw new YarnException("Mock RM encountered exception");
|
||||||
|
}
|
||||||
|
this.lastAsk = request.getAskList();
|
||||||
|
this.lastRelease = request.getReleaseList();
|
||||||
|
this.lastUpdates = request.getUpdateRequests();
|
||||||
|
this.lastBlacklistAdditions =
|
||||||
|
request.getResourceBlacklistRequest().getBlacklistAdditions();
|
||||||
|
this.lastBlacklistRemovals =
|
||||||
|
request.getResourceBlacklistRequest().getBlacklistRemovals();
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFailoverFlag() {
|
||||||
|
this.failover = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private MockApplicationMasterService mockAMS;
|
||||||
|
private String homeID = "home";
|
||||||
|
private AMRMClientRelayer homeRelayer;
|
||||||
|
private String uamID = "uam";
|
||||||
|
private AMRMClientRelayer uamRelayer;
|
||||||
|
|
||||||
|
private List<ResourceRequest> asks = new ArrayList<>();
|
||||||
|
private List<ContainerId> releases = new ArrayList<>();
|
||||||
|
private List<UpdateContainerRequest> updates = new ArrayList<>();
|
||||||
|
private List<String> blacklistAdditions = new ArrayList<>();
|
||||||
|
private List<String> blacklistRemoval = new ArrayList<>();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws YarnException, IOException {
|
||||||
|
this.conf = new Configuration();
|
||||||
|
|
||||||
|
this.mockAMS = new MockApplicationMasterService();
|
||||||
|
|
||||||
|
this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
|
||||||
|
ApplicationId.newInstance(0, 0), this.homeID);
|
||||||
|
this.homeRelayer.init(conf);
|
||||||
|
this.homeRelayer.start();
|
||||||
|
|
||||||
|
this.homeRelayer.registerApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest.newInstance("", 0, ""));
|
||||||
|
|
||||||
|
this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
|
||||||
|
ApplicationId.newInstance(0, 0), this.uamID);
|
||||||
|
this.uamRelayer.init(conf);
|
||||||
|
this.uamRelayer.start();
|
||||||
|
|
||||||
|
this.uamRelayer.registerApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest.newInstance("", 0, ""));
|
||||||
|
|
||||||
|
clearAllocateRequestLists();
|
||||||
|
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(homeID, RequestType.Guaranteed, 0);
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(homeID, RequestType.Opportunistic, 0);
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(homeID, RequestType.Promote, 0);
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(homeID, RequestType.Demote, 0);
|
||||||
|
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(uamID, RequestType.Guaranteed, 0);
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(uamID, RequestType.Opportunistic, 0);
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(uamID, RequestType.Promote, 0);
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.setClientPending(uamID, RequestType.Demote, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AllocateRequest getAllocateRequest() {
|
||||||
|
// Need to create a new one every time because rather than directly
|
||||||
|
// referring the lists, the protobuf impl makes a copy of the lists
|
||||||
|
return AllocateRequest.newBuilder()
|
||||||
|
.responseId(0)
|
||||||
|
.progress(0).askList(asks)
|
||||||
|
.releaseList(new ArrayList<>(this.releases))
|
||||||
|
.resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
|
||||||
|
new ArrayList<>(this.blacklistAdditions),
|
||||||
|
new ArrayList<>(this.blacklistRemoval)))
|
||||||
|
.updateRequests(new ArrayList<>(this.updates))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void clearAllocateRequestLists() {
|
||||||
|
this.asks.clear();
|
||||||
|
this.releases.clear();
|
||||||
|
this.updates.clear();
|
||||||
|
this.blacklistAdditions.clear();
|
||||||
|
this.blacklistRemoval.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static UpdateContainerRequest createPromote(int id){
|
||||||
|
return UpdateContainerRequest.newInstance(0, createContainerId(id),
|
||||||
|
ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
|
||||||
|
ExecutionType.GUARANTEED);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static UpdateContainerRequest createDemote(int id){
|
||||||
|
return UpdateContainerRequest.newInstance(0, createContainerId(id),
|
||||||
|
ContainerUpdateType.DEMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
|
||||||
|
ExecutionType.OPPORTUNISTIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ContainerId createContainerId(int id) {
|
||||||
|
return ContainerId.newContainerId(
|
||||||
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
|
||||||
|
id);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceRequest createResourceRequest(long id, String resource,
|
||||||
|
int memory, int vCores, int priority, ExecutionType execType,
|
||||||
|
int containers) {
|
||||||
|
ResourceRequest req = Records.newRecord(ResourceRequest.class);
|
||||||
|
req.setAllocationRequestId(id);
|
||||||
|
req.setResourceName(resource);
|
||||||
|
req.setCapability(Resource.newInstance(memory, vCores));
|
||||||
|
req.setPriority(Priority.newInstance(priority));
|
||||||
|
req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
|
||||||
|
req.setNumContainers(containers);
|
||||||
|
return req;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGPending() throws YarnException, IOException {
|
||||||
|
// Ask for two containers, one with location preference
|
||||||
|
this.asks.add(
|
||||||
|
createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
|
||||||
|
1));
|
||||||
|
this.asks.add(
|
||||||
|
createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
|
||||||
|
1));
|
||||||
|
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
|
||||||
|
ExecutionType.GUARANTEED, 2));
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
// Ask from the uam
|
||||||
|
this.uamRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
// Update the any to ask for an extra container
|
||||||
|
this.asks.get(2).setNumContainers(3);
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
// Update the any to ask to pretend a container was allocated
|
||||||
|
this.asks.get(2).setNumContainers(2);
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPromotePending() throws YarnException, IOException {
|
||||||
|
// Ask to promote 3 containers
|
||||||
|
this.updates.add(createPromote(1));
|
||||||
|
this.updates.add(createPromote(2));
|
||||||
|
this.updates.add(createPromote(3));
|
||||||
|
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Promote).value());
|
||||||
|
|
||||||
|
// Demote 2 containers, one of which is pending promote
|
||||||
|
this.updates.remove(createPromote(3));
|
||||||
|
this.updates.add(createDemote(3));
|
||||||
|
this.updates.add(createDemote(4));
|
||||||
|
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Promote).value());
|
||||||
|
|
||||||
|
// Let the RM respond with two successful promotions, one of which
|
||||||
|
// was pending promote
|
||||||
|
List<UpdatedContainer> updated = new ArrayList<>();
|
||||||
|
updated.add(UpdatedContainer
|
||||||
|
.newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
|
||||||
|
.newInstance(createContainerId(2), null, null, null, null, null)));
|
||||||
|
updated.add(UpdatedContainer
|
||||||
|
.newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
|
||||||
|
.newInstance(createContainerId(5), null, null, null, null, null)));
|
||||||
|
this.mockAMS.response.setUpdatedContainers(updated);
|
||||||
|
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(1, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Promote).value());
|
||||||
|
|
||||||
|
// Remove the promoted container and clean up response
|
||||||
|
this.mockAMS.response.getUpdatedContainers().clear();
|
||||||
|
this.updates.remove(createPromote(2));
|
||||||
|
|
||||||
|
// Let the RM respond with two completed containers, one of which was
|
||||||
|
// pending promote
|
||||||
|
List<ContainerStatus> completed = new ArrayList<>();
|
||||||
|
completed
|
||||||
|
.add(ContainerStatus.newInstance(createContainerId(1), null, "", 0));
|
||||||
|
completed
|
||||||
|
.add(ContainerStatus.newInstance(createContainerId(6), null, "", 0));
|
||||||
|
this.mockAMS.response.setCompletedContainersStatuses(completed);
|
||||||
|
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Promote).value());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanUpOnFinish() throws YarnException, IOException {
|
||||||
|
// Ask for two containers, one with location preference
|
||||||
|
this.asks.add(
|
||||||
|
createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
|
||||||
|
1));
|
||||||
|
this.asks.add(
|
||||||
|
createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
|
||||||
|
1));
|
||||||
|
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
|
||||||
|
ExecutionType.GUARANTEED, 2));
|
||||||
|
|
||||||
|
// Ask to promote 3 containers
|
||||||
|
this.updates.add(createPromote(1));
|
||||||
|
this.updates.add(createPromote(2));
|
||||||
|
this.updates.add(createPromote(3));
|
||||||
|
|
||||||
|
// Run the allocate call to start tracking pending
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
// After finish, the metrics should reset to zero
|
||||||
|
this.homeRelayer.shutdown();
|
||||||
|
|
||||||
|
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Promote).value());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailover() throws YarnException, IOException {
|
||||||
|
// Ask for two containers, one with location preference
|
||||||
|
this.asks.add(
|
||||||
|
createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
|
||||||
|
1));
|
||||||
|
this.asks.add(
|
||||||
|
createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
|
||||||
|
1));
|
||||||
|
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
|
||||||
|
ExecutionType.GUARANTEED, 2));
|
||||||
|
|
||||||
|
long previousSuccess = AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatSuccessMetric(homeID).value();
|
||||||
|
long previousFailover = AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getRMMasterSlaveSwitchMetric(homeID).value();
|
||||||
|
// Set failover to trigger
|
||||||
|
mockAMS.failover = true;
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
// The failover metric should be incremented
|
||||||
|
Assert.assertEquals(++previousFailover,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getRMMasterSlaveSwitchMetric(homeID).value());
|
||||||
|
|
||||||
|
// The success metric should be incremented once
|
||||||
|
Assert.assertEquals(++previousSuccess,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatSuccessMetric(homeID).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
// Ask from the uam
|
||||||
|
this.uamRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
// Update the any to ask for an extra container
|
||||||
|
this.asks.get(2).setNumContainers(3);
|
||||||
|
mockAMS.failover = true;
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
// The failover metric should be incremented
|
||||||
|
Assert.assertEquals(++previousFailover,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getRMMasterSlaveSwitchMetric(homeID).value());
|
||||||
|
|
||||||
|
// The success metric should be incremented once
|
||||||
|
Assert.assertEquals(++previousSuccess,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatSuccessMetric(homeID).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
// Update the any to ask to pretend a container was allocated
|
||||||
|
this.asks.get(2).setNumContainers(2);
|
||||||
|
mockAMS.failover = true;
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
// The failover metric should be incremented
|
||||||
|
Assert.assertEquals(++previousFailover,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getRMMasterSlaveSwitchMetric(homeID).value());
|
||||||
|
|
||||||
|
// The success metric should be incremented once
|
||||||
|
Assert.assertEquals(++previousSuccess,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatSuccessMetric(homeID).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
long previousFailure = AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatFailureMetric(homeID).value();
|
||||||
|
|
||||||
|
mockAMS.exception = true;
|
||||||
|
try{
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
Assert.fail();
|
||||||
|
} catch (YarnException e){
|
||||||
|
}
|
||||||
|
// The failover metric should not be incremented
|
||||||
|
Assert.assertEquals(previousFailover,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getRMMasterSlaveSwitchMetric(homeID).value());
|
||||||
|
|
||||||
|
// The success metric should not be incremented
|
||||||
|
Assert.assertEquals(previousSuccess,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatSuccessMetric(homeID).value());
|
||||||
|
|
||||||
|
// The failure metric should be incremented
|
||||||
|
Assert.assertEquals(++previousFailure,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatFailureMetric(homeID).value());
|
||||||
|
|
||||||
|
mockAMS.failover = true;
|
||||||
|
mockAMS.exception = true;
|
||||||
|
try{
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
Assert.fail();
|
||||||
|
} catch (YarnException e){
|
||||||
|
}
|
||||||
|
// The failover metric should be incremented
|
||||||
|
Assert.assertEquals(++previousFailover,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getRMMasterSlaveSwitchMetric(homeID).value());
|
||||||
|
|
||||||
|
// The success metric should not be incremented
|
||||||
|
Assert.assertEquals(previousSuccess,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatSuccessMetric(homeID).value());
|
||||||
|
|
||||||
|
// The failure metric should be incremented
|
||||||
|
Assert.assertEquals(++previousFailure,
|
||||||
|
AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getHeartbeatFailureMetric(homeID).value());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewEmptyRequest()
|
||||||
|
throws YarnException, IOException {
|
||||||
|
// Ask for zero containers
|
||||||
|
this.asks.add(createResourceRequest(1, ResourceRequest.ANY, 2048, 1, 1,
|
||||||
|
ExecutionType.GUARANTEED, 0));
|
||||||
|
this.homeRelayer.allocate(getAllocateRequest());
|
||||||
|
|
||||||
|
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(homeID, RequestType.Guaranteed).value());
|
||||||
|
|
||||||
|
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
|
||||||
|
.getPendingMetric(uamID, RequestType.Guaranteed).value());
|
||||||
|
}
|
||||||
|
}
|
|
@ -374,7 +374,7 @@ public class TestUnmanagedApplicationManager {
|
||||||
ApplicationId appId, String queueName, String submitter,
|
ApplicationId appId, String queueName, String submitter,
|
||||||
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
|
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
|
||||||
super(conf, appId, queueName, submitter, appNameSuffix,
|
super(conf, appId, queueName, submitter, appNameSuffix,
|
||||||
keepContainersAcrossApplicationAttempts);
|
keepContainersAcrossApplicationAttempts, "TEST");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -254,7 +254,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
||||||
this.homeSubClusterId =
|
this.homeSubClusterId =
|
||||||
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
||||||
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
|
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
|
||||||
ApplicationMasterProtocol.class, this.appOwner), appId);
|
ApplicationMasterProtocol.class, this.appOwner), appId,
|
||||||
|
this.homeSubClusterId.toString());
|
||||||
|
|
||||||
this.federationFacade = FederationStateStoreFacade.getInstance();
|
this.federationFacade = FederationStateStoreFacade.getInstance();
|
||||||
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
|
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
|
||||||
|
@ -340,7 +341,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
||||||
this.attemptId.getApplicationId(),
|
this.attemptId.getApplicationId(),
|
||||||
this.amRegistrationResponse.getQueue(),
|
this.amRegistrationResponse.getQueue(),
|
||||||
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
|
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
|
||||||
entry.getValue());
|
entry.getValue(), subClusterId.toString());
|
||||||
|
|
||||||
this.secondaryRelayers.put(subClusterId.getId(),
|
this.secondaryRelayers.put(subClusterId.getId(),
|
||||||
this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
|
this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
|
||||||
|
@ -666,7 +667,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
||||||
uamPool.finishApplicationMaster(subClusterId, finishRequest);
|
uamPool.finishApplicationMaster(subClusterId, finishRequest);
|
||||||
|
|
||||||
if (uamResponse.getIsUnregistered()) {
|
if (uamResponse.getIsUnregistered()) {
|
||||||
secondaryRelayers.remove(subClusterId);
|
AMRMClientRelayer relayer =
|
||||||
|
secondaryRelayers.remove(subClusterId);
|
||||||
|
if(relayer != null) {
|
||||||
|
relayer.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
if (getNMStateStore() != null) {
|
if (getNMStateStore() != null) {
|
||||||
getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
|
getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
|
||||||
|
@ -753,6 +758,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
||||||
}
|
}
|
||||||
this.threadpool = null;
|
this.threadpool = null;
|
||||||
}
|
}
|
||||||
|
homeRMRelayer.shutdown();
|
||||||
|
for(AMRMClientRelayer relayer : secondaryRelayers.values()){
|
||||||
|
relayer.shutdown();
|
||||||
|
}
|
||||||
super.shutdown();
|
super.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -885,7 +894,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
||||||
uamPool.reAttachUAM(subClusterId.getId(), config, appId,
|
uamPool.reAttachUAM(subClusterId.getId(), config, appId,
|
||||||
amRegistrationResponse.getQueue(),
|
amRegistrationResponse.getQueue(),
|
||||||
getApplicationContext().getUser(), homeSubClusterId.getId(),
|
getApplicationContext().getUser(), homeSubClusterId.getId(),
|
||||||
amrmToken);
|
amrmToken, subClusterId.toString());
|
||||||
|
|
||||||
secondaryRelayers.put(subClusterId.getId(),
|
secondaryRelayers.put(subClusterId.getId(),
|
||||||
uamPool.getAMRMClientRelayer(subClusterId.getId()));
|
uamPool.getAMRMClientRelayer(subClusterId.getId()));
|
||||||
|
@ -1136,7 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
||||||
token = uamPool.launchUAM(subClusterId, config,
|
token = uamPool.launchUAM(subClusterId, config,
|
||||||
attemptId.getApplicationId(),
|
attemptId.getApplicationId(),
|
||||||
amRegistrationResponse.getQueue(), appContext.getUser(),
|
amRegistrationResponse.getQueue(), appContext.getUser(),
|
||||||
homeSubClusterId.toString(), true);
|
homeSubClusterId.toString(), true, subClusterId);
|
||||||
|
|
||||||
secondaryRelayers.put(subClusterId,
|
secondaryRelayers.put(subClusterId,
|
||||||
uamPool.getAMRMClientRelayer(subClusterId));
|
uamPool.getAMRMClientRelayer(subClusterId));
|
||||||
|
|
|
@ -122,7 +122,8 @@ public class TestableFederationInterceptor extends FederationInterceptor {
|
||||||
@Override
|
@Override
|
||||||
public UnmanagedApplicationManager createUAM(Configuration conf,
|
public UnmanagedApplicationManager createUAM(Configuration conf,
|
||||||
ApplicationId appId, String queueName, String submitter,
|
ApplicationId appId, String queueName, String submitter,
|
||||||
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
|
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
|
||||||
|
String rmId) {
|
||||||
return new TestableUnmanagedApplicationManager(conf, appId, queueName,
|
return new TestableUnmanagedApplicationManager(conf, appId, queueName,
|
||||||
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
|
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
|
||||||
}
|
}
|
||||||
|
@ -139,7 +140,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
|
||||||
ApplicationId appId, String queueName, String submitter,
|
ApplicationId appId, String queueName, String submitter,
|
||||||
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
|
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
|
||||||
super(conf, appId, queueName, submitter, appNameSuffix,
|
super(conf, appId, queueName, submitter, appNameSuffix,
|
||||||
keepContainersAcrossApplicationAttempts);
|
keepContainersAcrossApplicationAttempts, "TEST");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue