YARN-7095. Federation: routing getNode/getNodes/getMetrics REST invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).
This commit is contained in:
parent
d4417dae4f
commit
bac4e8cca8
|
@ -31,35 +31,35 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
|||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class ClusterMetricsInfo {
|
||||
|
||||
protected int appsSubmitted;
|
||||
protected int appsCompleted;
|
||||
protected int appsPending;
|
||||
protected int appsRunning;
|
||||
protected int appsFailed;
|
||||
protected int appsKilled;
|
||||
private int appsSubmitted;
|
||||
private int appsCompleted;
|
||||
private int appsPending;
|
||||
private int appsRunning;
|
||||
private int appsFailed;
|
||||
private int appsKilled;
|
||||
|
||||
protected long reservedMB;
|
||||
protected long availableMB;
|
||||
protected long allocatedMB;
|
||||
private long reservedMB;
|
||||
private long availableMB;
|
||||
private long allocatedMB;
|
||||
|
||||
protected long reservedVirtualCores;
|
||||
protected long availableVirtualCores;
|
||||
protected long allocatedVirtualCores;
|
||||
private long reservedVirtualCores;
|
||||
private long availableVirtualCores;
|
||||
private long allocatedVirtualCores;
|
||||
|
||||
protected int containersAllocated;
|
||||
protected int containersReserved;
|
||||
protected int containersPending;
|
||||
private int containersAllocated;
|
||||
private int containersReserved;
|
||||
private int containersPending;
|
||||
|
||||
protected long totalMB;
|
||||
protected long totalVirtualCores;
|
||||
protected int totalNodes;
|
||||
protected int lostNodes;
|
||||
protected int unhealthyNodes;
|
||||
protected int decommissioningNodes;
|
||||
protected int decommissionedNodes;
|
||||
protected int rebootedNodes;
|
||||
protected int activeNodes;
|
||||
protected int shutdownNodes;
|
||||
private long totalMB;
|
||||
private long totalVirtualCores;
|
||||
private int totalNodes;
|
||||
private int lostNodes;
|
||||
private int unhealthyNodes;
|
||||
private int decommissioningNodes;
|
||||
private int decommissionedNodes;
|
||||
private int rebootedNodes;
|
||||
private int activeNodes;
|
||||
private int shutdownNodes;
|
||||
|
||||
public ClusterMetricsInfo() {
|
||||
} // JAXB needs this
|
||||
|
@ -93,8 +93,8 @@ public class ClusterMetricsInfo {
|
|||
|
||||
if (rs instanceof CapacityScheduler) {
|
||||
this.totalMB = availableMB + allocatedMB + reservedMB;
|
||||
this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores
|
||||
+ containersReserved;
|
||||
this.totalVirtualCores =
|
||||
availableVirtualCores + allocatedVirtualCores + containersReserved;
|
||||
} else {
|
||||
this.totalMB = availableMB + allocatedMB;
|
||||
this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores;
|
||||
|
@ -210,4 +210,104 @@ public class ClusterMetricsInfo {
|
|||
return this.shutdownNodes;
|
||||
}
|
||||
|
||||
public void setContainersReserved(int containersReserved) {
|
||||
this.containersReserved = containersReserved;
|
||||
}
|
||||
|
||||
public void setContainersPending(int containersPending) {
|
||||
this.containersPending = containersPending;
|
||||
}
|
||||
|
||||
public void setAppsSubmitted(int appsSubmitted) {
|
||||
this.appsSubmitted = appsSubmitted;
|
||||
}
|
||||
|
||||
public void setAppsCompleted(int appsCompleted) {
|
||||
this.appsCompleted = appsCompleted;
|
||||
}
|
||||
|
||||
public void setAppsPending(int appsPending) {
|
||||
this.appsPending = appsPending;
|
||||
}
|
||||
|
||||
public void setAppsRunning(int appsRunning) {
|
||||
this.appsRunning = appsRunning;
|
||||
}
|
||||
|
||||
public void setAppsFailed(int appsFailed) {
|
||||
this.appsFailed = appsFailed;
|
||||
}
|
||||
|
||||
public void setAppsKilled(int appsKilled) {
|
||||
this.appsKilled = appsKilled;
|
||||
}
|
||||
|
||||
public void setReservedMB(long reservedMB) {
|
||||
this.reservedMB = reservedMB;
|
||||
}
|
||||
|
||||
public void setAvailableMB(long availableMB) {
|
||||
this.availableMB = availableMB;
|
||||
}
|
||||
|
||||
public void setAllocatedMB(long allocatedMB) {
|
||||
this.allocatedMB = allocatedMB;
|
||||
}
|
||||
|
||||
public void setReservedVirtualCores(long reservedVirtualCores) {
|
||||
this.reservedVirtualCores = reservedVirtualCores;
|
||||
}
|
||||
|
||||
public void setAvailableVirtualCores(long availableVirtualCores) {
|
||||
this.availableVirtualCores = availableVirtualCores;
|
||||
}
|
||||
|
||||
public void setAllocatedVirtualCores(long allocatedVirtualCores) {
|
||||
this.allocatedVirtualCores = allocatedVirtualCores;
|
||||
}
|
||||
|
||||
public void setContainersAllocated(int containersAllocated) {
|
||||
this.containersAllocated = containersAllocated;
|
||||
}
|
||||
|
||||
public void setTotalMB(long totalMB) {
|
||||
this.totalMB = totalMB;
|
||||
}
|
||||
|
||||
public void setTotalVirtualCores(long totalVirtualCores) {
|
||||
this.totalVirtualCores = totalVirtualCores;
|
||||
}
|
||||
|
||||
public void setTotalNodes(int totalNodes) {
|
||||
this.totalNodes = totalNodes;
|
||||
}
|
||||
|
||||
public void setLostNodes(int lostNodes) {
|
||||
this.lostNodes = lostNodes;
|
||||
}
|
||||
|
||||
public void setUnhealthyNodes(int unhealthyNodes) {
|
||||
this.unhealthyNodes = unhealthyNodes;
|
||||
}
|
||||
|
||||
public void setDecommissioningNodes(int decommissioningNodes) {
|
||||
this.decommissioningNodes = decommissioningNodes;
|
||||
}
|
||||
|
||||
public void setDecommissionedNodes(int decommissionedNodes) {
|
||||
this.decommissionedNodes = decommissionedNodes;
|
||||
}
|
||||
|
||||
public void setRebootedNodes(int rebootedNodes) {
|
||||
this.rebootedNodes = rebootedNodes;
|
||||
}
|
||||
|
||||
public void setActiveNodes(int activeNodes) {
|
||||
this.activeNodes = activeNodes;
|
||||
}
|
||||
|
||||
public void setShutdownNodes(int shutdownNodes) {
|
||||
this.shutdownNodes = shutdownNodes;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,16 +33,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@XmlRootElement(name = "node")
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class NodeInfo {
|
||||
|
||||
protected String rack;
|
||||
protected NodeState state;
|
||||
protected String id;
|
||||
private String id;
|
||||
protected String nodeHostName;
|
||||
protected String nodeHTTPAddress;
|
||||
protected long lastHealthUpdate;
|
||||
private long lastHealthUpdate;
|
||||
protected String version;
|
||||
protected String healthReport;
|
||||
protected int numContainers;
|
||||
|
@ -184,4 +186,15 @@ public class NodeInfo {
|
|||
public ResourceUtilizationInfo getResourceUtilization() {
|
||||
return this.resourceUtilization;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setLastHealthUpdate(long lastHealthUpdate) {
|
||||
this.lastHealthUpdate = lastHealthUpdate;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -39,4 +39,8 @@ public class NodesInfo {
|
|||
public ArrayList<NodeInfo> getNodes() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public void addAll(ArrayList<NodeInfo> nodesInfo) {
|
||||
node.addAll(nodesInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -40,6 +39,7 @@ import org.apache.commons.lang.NotImplementedException;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -85,10 +85,12 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
|||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
|
||||
|
@ -136,7 +138,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
|
||||
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
|
||||
routerMetrics = RouterMetrics.getMetrics();
|
||||
threadpool = Executors.newCachedThreadPool();
|
||||
threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder()
|
||||
.setNameFormat("FederationInterceptorREST #%d").build());
|
||||
|
||||
returnPartialReport =
|
||||
conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
|
||||
|
@ -695,6 +698,219 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
returnPartialReport);
|
||||
}
|
||||
|
||||
/**
|
||||
* The Yarn Router will forward to the request to all the SubClusters to find
|
||||
* where the node is running.
|
||||
* <p>
|
||||
* Possible failure:
|
||||
* <p>
|
||||
* Client: identical behavior as {@code RMWebServices}.
|
||||
* <p>
|
||||
* Router: the Client will timeout and resubmit the request.
|
||||
* <p>
|
||||
* ResourceManager: the Router will timeout and the call will fail.
|
||||
* <p>
|
||||
* State Store: the Router will timeout and it will retry depending on the
|
||||
* FederationFacade settings - if the failure happened before the select
|
||||
* operation.
|
||||
*/
|
||||
@Override
|
||||
public NodeInfo getNode(String nodeId) {
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
|
||||
try {
|
||||
subClustersActive = federationFacade.getSubClusters(true);
|
||||
} catch (YarnException e) {
|
||||
throw new NotFoundException(e.getMessage());
|
||||
}
|
||||
|
||||
if (subClustersActive.isEmpty()) {
|
||||
throw new NotFoundException(
|
||||
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
|
||||
}
|
||||
|
||||
// Send the requests in parallel
|
||||
|
||||
ExecutorCompletionService<NodeInfo> compSvc =
|
||||
new ExecutorCompletionService<NodeInfo>(this.threadpool);
|
||||
|
||||
for (final SubClusterInfo info : subClustersActive.values()) {
|
||||
compSvc.submit(new Callable<NodeInfo>() {
|
||||
@Override
|
||||
public NodeInfo call() {
|
||||
DefaultRequestInterceptorREST interceptor =
|
||||
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
|
||||
info.getClientRMServiceAddress());
|
||||
try {
|
||||
NodeInfo nodeInfo = interceptor.getNode(nodeId);
|
||||
return nodeInfo;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Subcluster " + info.getSubClusterId()
|
||||
+ " failed to return nodeInfo.");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Collect all the responses in parallel
|
||||
NodeInfo nodeInfo = null;
|
||||
for (int i = 0; i < subClustersActive.values().size(); i++) {
|
||||
try {
|
||||
Future<NodeInfo> future = compSvc.take();
|
||||
NodeInfo nodeResponse = future.get();
|
||||
|
||||
// Check if the node was found in this SubCluster
|
||||
if (nodeResponse != null) {
|
||||
// Check if the node was already found in a different SubCluster and
|
||||
// it has an old health report
|
||||
if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse
|
||||
.getLastHealthUpdate()) {
|
||||
nodeInfo = nodeResponse;
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to get node report ", e);
|
||||
}
|
||||
}
|
||||
if (nodeInfo == null) {
|
||||
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
|
||||
}
|
||||
return nodeInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
|
||||
* after that it will remove all the duplicated NodeInfo by using the NodeId.
|
||||
* <p>
|
||||
* Possible failure:
|
||||
* <p>
|
||||
* Client: identical behavior as {@code RMWebServices}.
|
||||
* <p>
|
||||
* Router: the Client will timeout and resubmit the request.
|
||||
* <p>
|
||||
* ResourceManager: the Router calls each Yarn RM in parallel by using one
|
||||
* thread for each Yarn RM. In case a Yarn RM fails, a single call will
|
||||
* timeout. However the Router will use the NodesInfo it got, and provides a
|
||||
* partial list to the client.
|
||||
* <p>
|
||||
* State Store: the Router will timeout and it will retry depending on the
|
||||
* FederationFacade settings - if the failure happened before the select
|
||||
* operation.
|
||||
*/
|
||||
@Override
|
||||
public NodesInfo getNodes(String states) {
|
||||
|
||||
NodesInfo nodes = new NodesInfo();
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
|
||||
try {
|
||||
subClustersActive = federationFacade.getSubClusters(true);
|
||||
} catch (YarnException e) {
|
||||
LOG.error(e.getMessage());
|
||||
return new NodesInfo();
|
||||
}
|
||||
|
||||
// Send the requests in parallel
|
||||
|
||||
ExecutorCompletionService<NodesInfo> compSvc =
|
||||
new ExecutorCompletionService<NodesInfo>(this.threadpool);
|
||||
|
||||
for (final SubClusterInfo info : subClustersActive.values()) {
|
||||
compSvc.submit(new Callable<NodesInfo>() {
|
||||
@Override
|
||||
public NodesInfo call() {
|
||||
DefaultRequestInterceptorREST interceptor =
|
||||
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
|
||||
info.getClientRMServiceAddress());
|
||||
try {
|
||||
NodesInfo nodesInfo = interceptor.getNodes(states);
|
||||
return nodesInfo;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Subcluster " + info.getSubClusterId()
|
||||
+ " failed to return nodesInfo.");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Collect all the responses in parallel
|
||||
|
||||
for (int i = 0; i < subClustersActive.values().size(); i++) {
|
||||
try {
|
||||
Future<NodesInfo> future = compSvc.take();
|
||||
NodesInfo nodesResponse = future.get();
|
||||
|
||||
if (nodesResponse != null) {
|
||||
nodes.addAll(nodesResponse.getNodes());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to get nodes report ", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Delete duplicate from all the node reports got from all the available
|
||||
// Yarn RMs. Nodes can be moved from one subclusters to another. In this
|
||||
// operation they result LOST/RUNNING in the previous SubCluster and
|
||||
// NEW/RUNNING in the new one.
|
||||
|
||||
return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterMetricsInfo getClusterMetricsInfo() {
|
||||
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
|
||||
try {
|
||||
subClustersActive = federationFacade.getSubClusters(true);
|
||||
} catch (YarnException e) {
|
||||
LOG.error(e.getLocalizedMessage());
|
||||
return metrics;
|
||||
}
|
||||
|
||||
// Send the requests in parallel
|
||||
|
||||
ExecutorCompletionService<ClusterMetricsInfo> compSvc =
|
||||
new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool);
|
||||
|
||||
for (final SubClusterInfo info : subClustersActive.values()) {
|
||||
compSvc.submit(new Callable<ClusterMetricsInfo>() {
|
||||
@Override
|
||||
public ClusterMetricsInfo call() {
|
||||
DefaultRequestInterceptorREST interceptor =
|
||||
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
|
||||
info.getClientRMServiceAddress());
|
||||
try {
|
||||
ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo();
|
||||
return metrics;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Subcluster " + info.getSubClusterId()
|
||||
+ " failed to return Cluster Metrics.");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Collect all the responses in parallel
|
||||
|
||||
for (int i = 0; i < subClustersActive.values().size(); i++) {
|
||||
try {
|
||||
Future<ClusterMetricsInfo> future = compSvc.take();
|
||||
ClusterMetricsInfo metricsResponse = future.get();
|
||||
|
||||
if (metricsResponse != null) {
|
||||
RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to get nodes report ", e);
|
||||
}
|
||||
}
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterInfo get() {
|
||||
return getClusterInfo();
|
||||
|
@ -705,11 +921,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterMetricsInfo getClusterMetricsInfo() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchedulerTypeInfo getSchedulerInfo() {
|
||||
throw new NotImplementedException();
|
||||
|
@ -721,16 +932,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodesInfo getNodes(String states) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeInfo getNode(String nodeId) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
|
||||
throw new NotImplementedException();
|
||||
|
@ -933,4 +1134,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
+ "in the chain. Check if the interceptor pipeline configuration "
|
||||
+ "is correct");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (threadpool != null) {
|
||||
threadpool.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||
|
@ -233,10 +237,10 @@ public final class RouterWebServiceUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Merges a list of AppInfo grouping by ApplicationId. Our current policy
|
||||
* is to merge the application reports from the reacheable SubClusters.
|
||||
* Via configuration parameter, we decide whether to return applications
|
||||
* for which the primary AM is missing or to omit them.
|
||||
* Merges a list of AppInfo grouping by ApplicationId. Our current policy is
|
||||
* to merge the application reports from the reacheable SubClusters. Via
|
||||
* configuration parameter, we decide whether to return applications for which
|
||||
* the primary AM is missing or to omit them.
|
||||
*
|
||||
* @param appsInfo a list of AppInfo to merge
|
||||
* @param returnPartialResult if the merge AppsInfo should contain partial
|
||||
|
@ -331,4 +335,93 @@ public final class RouterWebServiceUtil {
|
|||
am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all the duplicate NodeInfo by discarding the old instances.
|
||||
*
|
||||
* @param nodes a list of NodeInfo to check for duplicates
|
||||
* @return a NodesInfo that contains a list of NodeInfos without duplicates
|
||||
*/
|
||||
public static NodesInfo deleteDuplicateNodesInfo(ArrayList<NodeInfo> nodes) {
|
||||
NodesInfo nodesInfo = new NodesInfo();
|
||||
|
||||
Map<String, NodeInfo> nodesMap = new LinkedHashMap<>();
|
||||
for (NodeInfo node : nodes) {
|
||||
String nodeId = node.getNodeId();
|
||||
// If the node already exists, it could be an old instance
|
||||
if (nodesMap.containsKey(nodeId)) {
|
||||
// Check if the node is an old instance
|
||||
if (nodesMap.get(nodeId).getLastHealthUpdate() < node
|
||||
.getLastHealthUpdate()) {
|
||||
nodesMap.put(node.getNodeId(), node);
|
||||
}
|
||||
} else {
|
||||
nodesMap.put(node.getNodeId(), node);
|
||||
}
|
||||
}
|
||||
nodesInfo.addAll(new ArrayList<NodeInfo>(nodesMap.values()));
|
||||
return nodesInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds all the values from the second ClusterMetricsInfo to the first one.
|
||||
*
|
||||
* @param metrics the ClusterMetricsInfo we want to update
|
||||
* @param metricsResponse the ClusterMetricsInfo we want to add to the first
|
||||
* param
|
||||
*/
|
||||
public static void mergeMetrics(ClusterMetricsInfo metrics,
|
||||
ClusterMetricsInfo metricsResponse) {
|
||||
metrics.setAppsSubmitted(
|
||||
metrics.getAppsSubmitted() + metricsResponse.getAppsSubmitted());
|
||||
metrics.setAppsCompleted(
|
||||
metrics.getAppsCompleted() + metricsResponse.getAppsCompleted());
|
||||
metrics.setAppsPending(
|
||||
metrics.getAppsPending() + metricsResponse.getAppsPending());
|
||||
metrics.setAppsRunning(
|
||||
metrics.getAppsRunning() + metricsResponse.getAppsRunning());
|
||||
metrics.setAppsFailed(
|
||||
metrics.getAppsFailed() + metricsResponse.getAppsFailed());
|
||||
metrics.setAppsKilled(
|
||||
metrics.getAppsKilled() + metricsResponse.getAppsKilled());
|
||||
|
||||
metrics.setReservedMB(
|
||||
metrics.getReservedMB() + metricsResponse.getReservedMB());
|
||||
metrics.setAvailableMB(
|
||||
metrics.getAvailableMB() + metricsResponse.getAvailableMB());
|
||||
metrics.setAllocatedMB(
|
||||
metrics.getAllocatedMB() + metricsResponse.getAllocatedMB());
|
||||
|
||||
metrics.setReservedVirtualCores(metrics.getReservedVirtualCores()
|
||||
+ metricsResponse.getReservedVirtualCores());
|
||||
metrics.setAvailableVirtualCores(metrics.getAvailableVirtualCores()
|
||||
+ metricsResponse.getAvailableVirtualCores());
|
||||
metrics.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores()
|
||||
+ metricsResponse.getAllocatedVirtualCores());
|
||||
|
||||
metrics.setContainersAllocated(metrics.getContainersAllocated()
|
||||
+ metricsResponse.getContainersAllocated());
|
||||
metrics.setContainersReserved(metrics.getReservedContainers()
|
||||
+ metricsResponse.getReservedContainers());
|
||||
metrics.setContainersPending(metrics.getPendingContainers()
|
||||
+ metricsResponse.getPendingContainers());
|
||||
|
||||
metrics.setTotalMB(metrics.getTotalMB() + metricsResponse.getTotalMB());
|
||||
metrics.setTotalVirtualCores(
|
||||
metrics.getTotalVirtualCores() + metrics.getTotalVirtualCores());
|
||||
metrics.setTotalNodes(metrics.getTotalNodes() + metrics.getTotalNodes());
|
||||
metrics.setLostNodes(metrics.getLostNodes() + metrics.getLostNodes());
|
||||
metrics.setUnhealthyNodes(
|
||||
metrics.getUnhealthyNodes() + metrics.getUnhealthyNodes());
|
||||
metrics.setDecommissioningNodes(
|
||||
metrics.getDecommissioningNodes() + metrics.getDecommissioningNodes());
|
||||
metrics.setDecommissionedNodes(
|
||||
metrics.getDecommissionedNodes() + metrics.getDecommissionedNodes());
|
||||
metrics.setRebootedNodes(
|
||||
metrics.getRebootedNodes() + metrics.getRebootedNodes());
|
||||
metrics.setActiveNodes(metrics.getActiveNodes() + metrics.getActiveNodes());
|
||||
metrics.setShutdownNodes(
|
||||
metrics.getShutdownNodes() + metrics.getShutdownNodes());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -149,6 +152,46 @@ public class MockDefaultRequestInterceptorREST
|
|||
return Response.status(Status.OK).entity(ret).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeInfo getNode(String nodeId) {
|
||||
if (!isRunning) {
|
||||
throw new RuntimeException("RM is stopped");
|
||||
}
|
||||
NodeInfo node = new NodeInfo();
|
||||
node.setId(nodeId);
|
||||
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
|
||||
return node;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodesInfo getNodes(String states) {
|
||||
if (!isRunning) {
|
||||
throw new RuntimeException("RM is stopped");
|
||||
}
|
||||
NodeInfo node = new NodeInfo();
|
||||
node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
|
||||
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
|
||||
NodesInfo nodes = new NodesInfo();
|
||||
nodes.add(node);
|
||||
return nodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterMetricsInfo getClusterMetricsInfo() {
|
||||
if (!isRunning) {
|
||||
throw new RuntimeException("RM is stopped");
|
||||
}
|
||||
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
|
||||
metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId()));
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
||||
public void setSubClusterId(int subClusterId) {
|
||||
setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
|
||||
}
|
||||
|
|
|
@ -36,7 +36,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -388,7 +391,56 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||
|
||||
Assert.assertNotNull(responseGet);
|
||||
Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
|
||||
// The merged operations will be tested in TestRouterWebServiceUtil
|
||||
// The merged operations is tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNodes in case each subcluster
|
||||
* provided one node with the LastHealthUpdate set to the SubClusterId. The
|
||||
* expected result would be the NodeInfo from the last SubCluster that has
|
||||
* LastHealthUpdate equal to Num_SubCluster -1.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNode() {
|
||||
|
||||
NodeInfo responseGet = interceptor.getNode("testGetNode");
|
||||
|
||||
Assert.assertNotNull(responseGet);
|
||||
Assert.assertEquals(NUM_SUBCLUSTER - 1, responseGet.getLastHealthUpdate());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNodes in case each subcluster
|
||||
* provided one node.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodes() {
|
||||
|
||||
NodesInfo responseGet = interceptor.getNodes(null);
|
||||
|
||||
Assert.assertNotNull(responseGet);
|
||||
Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getNodes().size());
|
||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of getClusterMetricsInfo in case each
|
||||
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to the
|
||||
* SubClusterId. The expected result would be appSubmitted equals to the sum
|
||||
* of SubClusterId. SubClusterId in this case is an integer.
|
||||
*/
|
||||
@Test
|
||||
public void testGetClusterMetrics() {
|
||||
|
||||
ClusterMetricsInfo responseGet = interceptor.getClusterMetricsInfo();
|
||||
|
||||
Assert.assertNotNull(responseGet);
|
||||
int expectedAppSubmitted = 0;
|
||||
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
|
||||
expectedAppSubmitted += i;
|
||||
}
|
||||
Assert.assertEquals(expectedAppSubmitted, responseGet.getAppsSubmitted());
|
||||
// The merge operations is tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,9 +37,13 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
|
|||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -88,9 +92,9 @@ public class TestFederationInterceptorRESTRetry
|
|||
interceptor.init(user);
|
||||
|
||||
// Create SubClusters
|
||||
good = SubClusterId.newInstance("0");
|
||||
bad1 = SubClusterId.newInstance("1");
|
||||
bad2 = SubClusterId.newInstance("2");
|
||||
good = SubClusterId.newInstance("1");
|
||||
bad1 = SubClusterId.newInstance("2");
|
||||
bad2 = SubClusterId.newInstance("3");
|
||||
scs.add(good);
|
||||
scs.add(bad1);
|
||||
scs.add(bad2);
|
||||
|
@ -316,4 +320,201 @@ public class TestFederationInterceptorRESTRetry
|
|||
Assert.assertEquals(1, response.getApps().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNode in case the cluster is
|
||||
* composed of only 1 bad SubCluster.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodeOneBadSC()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
|
||||
setupCluster(Arrays.asList(bad2));
|
||||
try {
|
||||
interceptor.getNode("testGetNodeOneBadSC");
|
||||
Assert.fail();
|
||||
} catch (NotFoundException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().contains("nodeId, testGetNodeOneBadSC, is not found"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNode in case the cluster is
|
||||
* composed of only 2 bad SubClusters.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodeTwoBadSCs()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(bad1, bad2));
|
||||
|
||||
try {
|
||||
interceptor.getNode("testGetNodeTwoBadSCs");
|
||||
Assert.fail();
|
||||
} catch (NotFoundException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.contains("nodeId, testGetNodeTwoBadSCs, is not found"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNode in case the cluster is
|
||||
* composed of only 1 bad SubCluster and a good one.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodeOneBadOneGood()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(good, bad2));
|
||||
|
||||
NodeInfo response = interceptor.getNode(null);
|
||||
Assert.assertNotNull(response);
|
||||
// Check if the only node came from Good SubCluster
|
||||
Assert.assertEquals(good.getId(),
|
||||
Long.toString(response.getLastHealthUpdate()));
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNodes in case the cluster is
|
||||
* composed of only 1 bad SubCluster.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodesOneBadSC()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
|
||||
setupCluster(Arrays.asList(bad2));
|
||||
|
||||
NodesInfo response = interceptor.getNodes(null);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(0, response.getNodes().size());
|
||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNodes in case the cluster is
|
||||
* composed of only 2 bad SubClusters.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodesTwoBadSCs()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(bad1, bad2));
|
||||
|
||||
NodesInfo response = interceptor.getNodes(null);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(0, response.getNodes().size());
|
||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNodes in case the cluster is
|
||||
* composed of only 1 bad SubCluster and a good one.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodesOneBadOneGood()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(good, bad2));
|
||||
|
||||
NodesInfo response = interceptor.getNodes(null);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(1, response.getNodes().size());
|
||||
// Check if the only node came from Good SubCluster
|
||||
Assert.assertEquals(good.getId(),
|
||||
Long.toString(response.getNodes().get(0).getLastHealthUpdate()));
|
||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNodes in case the cluster is
|
||||
* composed of only 1 bad SubCluster. The excepted result would be a
|
||||
* ClusterMetricsInfo with all its values set to 0.
|
||||
*/
|
||||
@Test
|
||||
public void testGetClusterMetricsOneBadSC()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(bad2));
|
||||
|
||||
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
|
||||
Assert.assertNotNull(response);
|
||||
// check if we got an empty metrics
|
||||
checkEmptyMetrics(response);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetClusterMetrics in case the
|
||||
* cluster is composed of only 2 bad SubClusters. The excepted result would be
|
||||
* a ClusterMetricsInfo with all its values set to 0.
|
||||
*/
|
||||
@Test
|
||||
public void testGetClusterMetricsTwoBadSCs()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(bad1, bad2));
|
||||
|
||||
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
|
||||
Assert.assertNotNull(response);
|
||||
// check if we got an empty metrics
|
||||
Assert.assertEquals(0, response.getAppsSubmitted());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetClusterMetrics in case the
|
||||
* cluster is composed of only 1 bad SubCluster and a good one. The good
|
||||
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to its
|
||||
* SubClusterId. The expected result would be appSubmitted equals to its
|
||||
* SubClusterId. SubClusterId in this case is an integer.
|
||||
*/
|
||||
@Test
|
||||
public void testGetClusterMetricsOneBadOneGood()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
setupCluster(Arrays.asList(good, bad2));
|
||||
|
||||
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
|
||||
Assert.assertNotNull(response);
|
||||
checkMetricsFromGoodSC(response);
|
||||
// The merge operations is tested in TestRouterWebServiceUtil
|
||||
}
|
||||
|
||||
private void checkMetricsFromGoodSC(ClusterMetricsInfo response) {
|
||||
Assert.assertEquals(Integer.parseInt(good.getId()),
|
||||
response.getAppsSubmitted());
|
||||
Assert.assertEquals(Integer.parseInt(good.getId()),
|
||||
response.getAppsCompleted());
|
||||
Assert.assertEquals(Integer.parseInt(good.getId()),
|
||||
response.getAppsPending());
|
||||
Assert.assertEquals(Integer.parseInt(good.getId()),
|
||||
response.getAppsRunning());
|
||||
Assert.assertEquals(Integer.parseInt(good.getId()),
|
||||
response.getAppsFailed());
|
||||
Assert.assertEquals(Integer.parseInt(good.getId()),
|
||||
response.getAppsKilled());
|
||||
}
|
||||
|
||||
private void checkEmptyMetrics(ClusterMetricsInfo response) {
|
||||
Assert.assertEquals(0, response.getAppsSubmitted());
|
||||
Assert.assertEquals(0, response.getAppsCompleted());
|
||||
Assert.assertEquals(0, response.getAppsPending());
|
||||
Assert.assertEquals(0, response.getAppsRunning());
|
||||
Assert.assertEquals(0, response.getAppsFailed());
|
||||
Assert.assertEquals(0, response.getAppsKilled());
|
||||
|
||||
Assert.assertEquals(0, response.getReservedMB());
|
||||
Assert.assertEquals(0, response.getAvailableMB());
|
||||
Assert.assertEquals(0, response.getAllocatedMB());
|
||||
|
||||
Assert.assertEquals(0, response.getReservedVirtualCores());
|
||||
Assert.assertEquals(0, response.getAvailableVirtualCores());
|
||||
Assert.assertEquals(0, response.getAllocatedVirtualCores());
|
||||
|
||||
Assert.assertEquals(0, response.getContainersAllocated());
|
||||
Assert.assertEquals(0, response.getReservedContainers());
|
||||
Assert.assertEquals(0, response.getPendingContainers());
|
||||
|
||||
Assert.assertEquals(0, response.getTotalMB());
|
||||
Assert.assertEquals(0, response.getTotalVirtualCores());
|
||||
Assert.assertEquals(0, response.getTotalNodes());
|
||||
Assert.assertEquals(0, response.getLostNodes());
|
||||
Assert.assertEquals(0, response.getUnhealthyNodes());
|
||||
Assert.assertEquals(0, response.getDecommissioningNodes());
|
||||
Assert.assertEquals(0, response.getDecommissionedNodes());
|
||||
Assert.assertEquals(0, response.getRebootedNodes());
|
||||
Assert.assertEquals(0, response.getActiveNodes());
|
||||
Assert.assertEquals(0, response.getShutdownNodes());
|
||||
}
|
||||
}
|
|
@ -20,11 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
|
||||
import org.junit.Assert;
|
||||
|
@ -40,6 +44,11 @@ public class TestRouterWebServiceUtil {
|
|||
private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
|
||||
private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
|
||||
|
||||
private static final String NODE1 = "Node1";
|
||||
private static final String NODE2 = "Node2";
|
||||
private static final String NODE3 = "Node3";
|
||||
private static final String NODE4 = "Node4";
|
||||
|
||||
/**
|
||||
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
|
||||
* in case we want to merge 4 AMs. The expected result would be the same 4
|
||||
|
@ -308,4 +317,257 @@ public class TestRouterWebServiceUtil {
|
|||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(1, result.getApps().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of
|
||||
* RouterWebServiceUtil#deleteDuplicateNodesInfo in case we want to merge 4
|
||||
* Nodes. The expected result would be the same 4 Nodes.
|
||||
*/
|
||||
@Test
|
||||
public void testDeleteDuplicate4DifferentNodes() {
|
||||
|
||||
NodesInfo nodes = new NodesInfo();
|
||||
|
||||
NodeInfo nodeInfo1 = new NodeInfo();
|
||||
nodeInfo1.setId(NODE1);
|
||||
nodes.add(nodeInfo1);
|
||||
|
||||
NodeInfo nodeInfo2 = new NodeInfo();
|
||||
nodeInfo2.setId(NODE2);
|
||||
nodes.add(nodeInfo2);
|
||||
|
||||
NodeInfo nodeInfo3 = new NodeInfo();
|
||||
nodeInfo3.setId(NODE3);
|
||||
nodes.add(nodeInfo3);
|
||||
|
||||
NodeInfo nodeInfo4 = new NodeInfo();
|
||||
nodeInfo4.setId(NODE4);
|
||||
nodes.add(nodeInfo4);
|
||||
|
||||
NodesInfo result =
|
||||
RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(4, result.getNodes().size());
|
||||
|
||||
List<String> nodesIds = new ArrayList<String>();
|
||||
|
||||
for (NodeInfo node : result.getNodes()) {
|
||||
nodesIds.add(node.getNodeId());
|
||||
}
|
||||
|
||||
Assert.assertTrue(nodesIds.contains(NODE1));
|
||||
Assert.assertTrue(nodesIds.contains(NODE2));
|
||||
Assert.assertTrue(nodesIds.contains(NODE3));
|
||||
Assert.assertTrue(nodesIds.contains(NODE4));
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of
|
||||
* {@link RouterWebServiceUtil#deleteDuplicateNodesInfo(ArrayList)} in case we
|
||||
* want to merge 3 nodes with the same id. The expected result would be 1 node
|
||||
* report with the newest healthy report.
|
||||
*/
|
||||
@Test
|
||||
public void testDeleteDuplicateNodes() {
|
||||
|
||||
NodesInfo nodes = new NodesInfo();
|
||||
|
||||
NodeInfo node1 = new NodeInfo();
|
||||
node1.setId(NODE1);
|
||||
node1.setLastHealthUpdate(0);
|
||||
nodes.add(node1);
|
||||
|
||||
NodeInfo node2 = new NodeInfo();
|
||||
node2.setId(NODE1);
|
||||
node2.setLastHealthUpdate(1);
|
||||
nodes.add(node2);
|
||||
|
||||
NodeInfo node3 = new NodeInfo();
|
||||
node3.setId(NODE1);
|
||||
node3.setLastHealthUpdate(2);
|
||||
nodes.add(node3);
|
||||
|
||||
NodesInfo result =
|
||||
RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(1, result.getNodes().size());
|
||||
|
||||
NodeInfo node = result.getNodes().get(0);
|
||||
|
||||
Assert.assertEquals(NODE1, node.getNodeId());
|
||||
Assert.assertEquals(2, node.getLastHealthUpdate());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of
|
||||
* {@link RouterWebServiceUtil#mergeMetrics}.
|
||||
*/
|
||||
@Test
|
||||
public void testMergeMetrics() {
|
||||
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
|
||||
ClusterMetricsInfo metricsResponse = new ClusterMetricsInfo();
|
||||
|
||||
setUpClusterMetrics(metrics);
|
||||
setUpClusterMetrics(metricsResponse);
|
||||
ClusterMetricsInfo metricsClone = createClusterMetricsClone(metrics);
|
||||
RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
|
||||
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAppsSubmitted() + metricsClone.getAppsSubmitted(),
|
||||
metrics.getAppsSubmitted());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAppsCompleted() + metricsClone.getAppsCompleted(),
|
||||
metrics.getAppsCompleted());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAppsPending() + metricsClone.getAppsPending(),
|
||||
metrics.getAppsPending());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAppsRunning() + metricsClone.getAppsRunning(),
|
||||
metrics.getAppsRunning());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAppsFailed() + metricsClone.getAppsFailed(),
|
||||
metrics.getAppsFailed());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAppsKilled() + metricsClone.getAppsKilled(),
|
||||
metrics.getAppsKilled());
|
||||
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getReservedMB() + metricsClone.getReservedMB(),
|
||||
metrics.getReservedMB());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAvailableMB() + metricsClone.getAvailableMB(),
|
||||
metrics.getAvailableMB());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAllocatedMB() + metricsClone.getAllocatedMB(),
|
||||
metrics.getAllocatedMB());
|
||||
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getReservedVirtualCores()
|
||||
+ metricsClone.getReservedVirtualCores(),
|
||||
metrics.getReservedVirtualCores());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAvailableVirtualCores()
|
||||
+ metricsClone.getAvailableVirtualCores(),
|
||||
metrics.getAvailableVirtualCores());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getAllocatedVirtualCores()
|
||||
+ metricsClone.getAllocatedVirtualCores(),
|
||||
metrics.getAllocatedVirtualCores());
|
||||
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getContainersAllocated()
|
||||
+ metricsClone.getContainersAllocated(),
|
||||
metrics.getContainersAllocated());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getReservedContainers()
|
||||
+ metricsClone.getReservedContainers(),
|
||||
metrics.getReservedContainers());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getPendingContainers()
|
||||
+ metricsClone.getPendingContainers(),
|
||||
metrics.getPendingContainers());
|
||||
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getTotalMB() + metricsClone.getTotalMB(),
|
||||
metrics.getTotalMB());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getTotalVirtualCores()
|
||||
+ metricsClone.getTotalVirtualCores(),
|
||||
metrics.getTotalVirtualCores());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getTotalNodes() + metricsClone.getTotalNodes(),
|
||||
metrics.getTotalNodes());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getLostNodes() + metricsClone.getLostNodes(),
|
||||
metrics.getLostNodes());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getUnhealthyNodes() + metricsClone.getUnhealthyNodes(),
|
||||
metrics.getUnhealthyNodes());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getDecommissioningNodes()
|
||||
+ metricsClone.getDecommissioningNodes(),
|
||||
metrics.getDecommissioningNodes());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getDecommissionedNodes()
|
||||
+ metricsClone.getDecommissionedNodes(),
|
||||
metrics.getDecommissionedNodes());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getRebootedNodes() + metricsClone.getRebootedNodes(),
|
||||
metrics.getRebootedNodes());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getActiveNodes() + metricsClone.getActiveNodes(),
|
||||
metrics.getActiveNodes());
|
||||
Assert.assertEquals(
|
||||
metricsResponse.getShutdownNodes() + metricsClone.getShutdownNodes(),
|
||||
metrics.getShutdownNodes());
|
||||
}
|
||||
|
||||
private ClusterMetricsInfo createClusterMetricsClone(
|
||||
ClusterMetricsInfo metrics) {
|
||||
ClusterMetricsInfo metricsClone = new ClusterMetricsInfo();
|
||||
metricsClone.setAppsSubmitted(metrics.getAppsSubmitted());
|
||||
metricsClone.setAppsCompleted(metrics.getAppsCompleted());
|
||||
metricsClone.setAppsPending(metrics.getAppsPending());
|
||||
metricsClone.setAppsRunning(metrics.getAppsRunning());
|
||||
metricsClone.setAppsFailed(metrics.getAppsFailed());
|
||||
metricsClone.setAppsKilled(metrics.getAppsKilled());
|
||||
|
||||
metricsClone.setReservedMB(metrics.getReservedMB());
|
||||
metricsClone.setAvailableMB(metrics.getAvailableMB());
|
||||
metricsClone.setAllocatedMB(metrics.getAllocatedMB());
|
||||
|
||||
metricsClone.setReservedVirtualCores(metrics.getReservedVirtualCores());
|
||||
metricsClone.setAvailableVirtualCores(metrics.getAvailableVirtualCores());
|
||||
metricsClone.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores());
|
||||
|
||||
metricsClone.setContainersAllocated(metrics.getContainersAllocated());
|
||||
metricsClone.setContainersReserved(metrics.getReservedContainers());
|
||||
metricsClone.setContainersPending(metrics.getPendingContainers());
|
||||
|
||||
metricsClone.setTotalMB(metrics.getTotalMB());
|
||||
metricsClone.setTotalVirtualCores(metrics.getTotalVirtualCores());
|
||||
metricsClone.setTotalNodes(metrics.getTotalNodes());
|
||||
metricsClone.setLostNodes(metrics.getLostNodes());
|
||||
metricsClone.setUnhealthyNodes(metrics.getUnhealthyNodes());
|
||||
metricsClone.setDecommissioningNodes(metrics.getDecommissioningNodes());
|
||||
metricsClone.setDecommissionedNodes(metrics.getDecommissionedNodes());
|
||||
metricsClone.setRebootedNodes(metrics.getRebootedNodes());
|
||||
metricsClone.setActiveNodes(metrics.getActiveNodes());
|
||||
metricsClone.setShutdownNodes(metrics.getShutdownNodes());
|
||||
return metricsClone;
|
||||
|
||||
}
|
||||
|
||||
private void setUpClusterMetrics(ClusterMetricsInfo metrics) {
|
||||
Random rand = new Random(System.currentTimeMillis());
|
||||
metrics.setAppsSubmitted(rand.nextInt(1000));
|
||||
metrics.setAppsCompleted(rand.nextInt(1000));
|
||||
metrics.setAppsPending(rand.nextInt(1000));
|
||||
metrics.setAppsRunning(rand.nextInt(1000));
|
||||
metrics.setAppsFailed(rand.nextInt(1000));
|
||||
metrics.setAppsKilled(rand.nextInt(1000));
|
||||
|
||||
metrics.setReservedMB(rand.nextInt(1000));
|
||||
metrics.setAvailableMB(rand.nextInt(1000));
|
||||
metrics.setAllocatedMB(rand.nextInt(1000));
|
||||
|
||||
metrics.setReservedVirtualCores(rand.nextInt(1000));
|
||||
metrics.setAvailableVirtualCores(rand.nextInt(1000));
|
||||
metrics.setAllocatedVirtualCores(rand.nextInt(1000));
|
||||
|
||||
metrics.setContainersAllocated(rand.nextInt(1000));
|
||||
metrics.setContainersReserved(rand.nextInt(1000));
|
||||
metrics.setContainersPending(rand.nextInt(1000));
|
||||
|
||||
metrics.setTotalMB(rand.nextInt(1000));
|
||||
metrics.setTotalVirtualCores(rand.nextInt(1000));
|
||||
metrics.setTotalNodes(rand.nextInt(1000));
|
||||
metrics.setLostNodes(rand.nextInt(1000));
|
||||
metrics.setUnhealthyNodes(rand.nextInt(1000));
|
||||
metrics.setDecommissioningNodes(rand.nextInt(1000));
|
||||
metrics.setDecommissionedNodes(rand.nextInt(1000));
|
||||
metrics.setRebootedNodes(rand.nextInt(1000));
|
||||
metrics.setActiveNodes(rand.nextInt(1000));
|
||||
metrics.setShutdownNodes(rand.nextInt(1000));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue