YARN-7095. Federation: routing getNode/getNodes/getMetrics REST invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).

(cherry picked from commit bac4e8cca8)
This commit is contained in:
Subru Krishnan 2017-08-31 15:05:41 -07:00
parent 54a694172d
commit adf5ea73d6
9 changed files with 1031 additions and 55 deletions

View File

@ -31,35 +31,35 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
@XmlAccessorType(XmlAccessType.FIELD)
public class ClusterMetricsInfo {
protected int appsSubmitted;
protected int appsCompleted;
protected int appsPending;
protected int appsRunning;
protected int appsFailed;
protected int appsKilled;
private int appsSubmitted;
private int appsCompleted;
private int appsPending;
private int appsRunning;
private int appsFailed;
private int appsKilled;
protected long reservedMB;
protected long availableMB;
protected long allocatedMB;
private long reservedMB;
private long availableMB;
private long allocatedMB;
protected long reservedVirtualCores;
protected long availableVirtualCores;
protected long allocatedVirtualCores;
private long reservedVirtualCores;
private long availableVirtualCores;
private long allocatedVirtualCores;
protected int containersAllocated;
protected int containersReserved;
protected int containersPending;
private int containersAllocated;
private int containersReserved;
private int containersPending;
protected long totalMB;
protected long totalVirtualCores;
protected int totalNodes;
protected int lostNodes;
protected int unhealthyNodes;
protected int decommissioningNodes;
protected int decommissionedNodes;
protected int rebootedNodes;
protected int activeNodes;
protected int shutdownNodes;
private long totalMB;
private long totalVirtualCores;
private int totalNodes;
private int lostNodes;
private int unhealthyNodes;
private int decommissioningNodes;
private int decommissionedNodes;
private int rebootedNodes;
private int activeNodes;
private int shutdownNodes;
public ClusterMetricsInfo() {
} // JAXB needs this
@ -93,8 +93,8 @@ public class ClusterMetricsInfo {
if (rs instanceof CapacityScheduler) {
this.totalMB = availableMB + allocatedMB + reservedMB;
this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores
+ containersReserved;
this.totalVirtualCores =
availableVirtualCores + allocatedVirtualCores + containersReserved;
} else {
this.totalMB = availableMB + allocatedMB;
this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores;
@ -210,4 +210,104 @@ public class ClusterMetricsInfo {
return this.shutdownNodes;
}
public void setContainersReserved(int containersReserved) {
this.containersReserved = containersReserved;
}
public void setContainersPending(int containersPending) {
this.containersPending = containersPending;
}
public void setAppsSubmitted(int appsSubmitted) {
this.appsSubmitted = appsSubmitted;
}
public void setAppsCompleted(int appsCompleted) {
this.appsCompleted = appsCompleted;
}
public void setAppsPending(int appsPending) {
this.appsPending = appsPending;
}
public void setAppsRunning(int appsRunning) {
this.appsRunning = appsRunning;
}
public void setAppsFailed(int appsFailed) {
this.appsFailed = appsFailed;
}
public void setAppsKilled(int appsKilled) {
this.appsKilled = appsKilled;
}
public void setReservedMB(long reservedMB) {
this.reservedMB = reservedMB;
}
public void setAvailableMB(long availableMB) {
this.availableMB = availableMB;
}
public void setAllocatedMB(long allocatedMB) {
this.allocatedMB = allocatedMB;
}
public void setReservedVirtualCores(long reservedVirtualCores) {
this.reservedVirtualCores = reservedVirtualCores;
}
public void setAvailableVirtualCores(long availableVirtualCores) {
this.availableVirtualCores = availableVirtualCores;
}
public void setAllocatedVirtualCores(long allocatedVirtualCores) {
this.allocatedVirtualCores = allocatedVirtualCores;
}
public void setContainersAllocated(int containersAllocated) {
this.containersAllocated = containersAllocated;
}
public void setTotalMB(long totalMB) {
this.totalMB = totalMB;
}
public void setTotalVirtualCores(long totalVirtualCores) {
this.totalVirtualCores = totalVirtualCores;
}
public void setTotalNodes(int totalNodes) {
this.totalNodes = totalNodes;
}
public void setLostNodes(int lostNodes) {
this.lostNodes = lostNodes;
}
public void setUnhealthyNodes(int unhealthyNodes) {
this.unhealthyNodes = unhealthyNodes;
}
public void setDecommissioningNodes(int decommissioningNodes) {
this.decommissioningNodes = decommissioningNodes;
}
public void setDecommissionedNodes(int decommissionedNodes) {
this.decommissionedNodes = decommissionedNodes;
}
public void setRebootedNodes(int rebootedNodes) {
this.rebootedNodes = rebootedNodes;
}
public void setActiveNodes(int activeNodes) {
this.activeNodes = activeNodes;
}
public void setShutdownNodes(int shutdownNodes) {
this.shutdownNodes = shutdownNodes;
}
}

View File

@ -33,16 +33,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import com.google.common.annotations.VisibleForTesting;
@XmlRootElement(name = "node")
@XmlAccessorType(XmlAccessType.FIELD)
public class NodeInfo {
protected String rack;
protected NodeState state;
protected String id;
private String id;
protected String nodeHostName;
protected String nodeHTTPAddress;
protected long lastHealthUpdate;
private long lastHealthUpdate;
protected String version;
protected String healthReport;
protected int numContainers;
@ -184,4 +186,15 @@ public class NodeInfo {
public ResourceUtilizationInfo getResourceUtilization() {
return this.resourceUtilization;
}
@VisibleForTesting
public void setId(String id) {
this.id = id;
}
@VisibleForTesting
public void setLastHealthUpdate(long lastHealthUpdate) {
this.lastHealthUpdate = lastHealthUpdate;
}
}

View File

@ -39,4 +39,8 @@ public class NodesInfo {
public ArrayList<NodeInfo> getNodes() {
return node;
}
public void addAll(ArrayList<NodeInfo> nodesInfo) {
node.addAll(nodesInfo);
}
}

View File

@ -28,7 +28,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
@ -40,6 +39,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -85,10 +85,12 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
@ -136,7 +138,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
routerMetrics = RouterMetrics.getMetrics();
threadpool = Executors.newCachedThreadPool();
threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat("FederationInterceptorREST #%d").build());
returnPartialReport =
conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
@ -697,6 +700,219 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
returnPartialReport);
}
/**
* The Yarn Router will forward to the request to all the SubClusters to find
* where the node is running.
* <p>
* Possible failure:
* <p>
* Client: identical behavior as {@code RMWebServices}.
* <p>
* Router: the Client will timeout and resubmit the request.
* <p>
* ResourceManager: the Router will timeout and the call will fail.
* <p>
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public NodeInfo getNode(final String nodeId) {
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
throw new NotFoundException(e.getMessage());
}
if (subClustersActive.isEmpty()) {
throw new NotFoundException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}
// Send the requests in parallel
ExecutorCompletionService<NodeInfo> compSvc =
new ExecutorCompletionService<NodeInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) {
compSvc.submit(new Callable<NodeInfo>() {
@Override
public NodeInfo call() {
DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
info.getClientRMServiceAddress());
try {
NodeInfo nodeInfo = interceptor.getNode(nodeId);
return nodeInfo;
} catch (Exception e) {
LOG.error("Subcluster " + info.getSubClusterId()
+ " failed to return nodeInfo.");
return null;
}
}
});
}
// Collect all the responses in parallel
NodeInfo nodeInfo = null;
for (int i = 0; i < subClustersActive.values().size(); i++) {
try {
Future<NodeInfo> future = compSvc.take();
NodeInfo nodeResponse = future.get();
// Check if the node was found in this SubCluster
if (nodeResponse != null) {
// Check if the node was already found in a different SubCluster and
// it has an old health report
if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse
.getLastHealthUpdate()) {
nodeInfo = nodeResponse;
}
}
} catch (Throwable e) {
LOG.warn("Failed to get node report ", e);
}
}
if (nodeInfo == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
}
return nodeInfo;
}
/**
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
* after that it will remove all the duplicated NodeInfo by using the NodeId.
* <p>
* Possible failure:
* <p>
* Client: identical behavior as {@code RMWebServices}.
* <p>
* Router: the Client will timeout and resubmit the request.
* <p>
* ResourceManager: the Router calls each Yarn RM in parallel by using one
* thread for each Yarn RM. In case a Yarn RM fails, a single call will
* timeout. However the Router will use the NodesInfo it got, and provides a
* partial list to the client.
* <p>
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public NodesInfo getNodes(final String states) {
NodesInfo nodes = new NodesInfo();
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
LOG.error(e.getMessage());
return new NodesInfo();
}
// Send the requests in parallel
ExecutorCompletionService<NodesInfo> compSvc =
new ExecutorCompletionService<NodesInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) {
compSvc.submit(new Callable<NodesInfo>() {
@Override
public NodesInfo call() {
DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
info.getClientRMServiceAddress());
try {
NodesInfo nodesInfo = interceptor.getNodes(states);
return nodesInfo;
} catch (Exception e) {
LOG.error("Subcluster " + info.getSubClusterId()
+ " failed to return nodesInfo.");
return null;
}
}
});
}
// Collect all the responses in parallel
for (int i = 0; i < subClustersActive.values().size(); i++) {
try {
Future<NodesInfo> future = compSvc.take();
NodesInfo nodesResponse = future.get();
if (nodesResponse != null) {
nodes.addAll(nodesResponse.getNodes());
}
} catch (Throwable e) {
LOG.warn("Failed to get nodes report ", e);
}
}
// Delete duplicate from all the node reports got from all the available
// Yarn RMs. Nodes can be moved from one subclusters to another. In this
// operation they result LOST/RUNNING in the previous SubCluster and
// NEW/RUNNING in the new one.
return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
LOG.error(e.getLocalizedMessage());
return metrics;
}
// Send the requests in parallel
ExecutorCompletionService<ClusterMetricsInfo> compSvc =
new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) {
compSvc.submit(new Callable<ClusterMetricsInfo>() {
@Override
public ClusterMetricsInfo call() {
DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
info.getClientRMServiceAddress());
try {
ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo();
return metrics;
} catch (Exception e) {
LOG.error("Subcluster " + info.getSubClusterId()
+ " failed to return Cluster Metrics.");
return null;
}
}
});
}
// Collect all the responses in parallel
for (int i = 0; i < subClustersActive.values().size(); i++) {
try {
Future<ClusterMetricsInfo> future = compSvc.take();
ClusterMetricsInfo metricsResponse = future.get();
if (metricsResponse != null) {
RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
}
} catch (Throwable e) {
LOG.warn("Failed to get nodes report ", e);
}
}
return metrics;
}
@Override
public ClusterInfo get() {
return getClusterInfo();
@ -707,11 +923,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new NotImplementedException();
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
throw new NotImplementedException();
}
@Override
public SchedulerTypeInfo getSchedulerInfo() {
throw new NotImplementedException();
@ -723,16 +934,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new NotImplementedException();
}
@Override
public NodesInfo getNodes(String states) {
throw new NotImplementedException();
}
@Override
public NodeInfo getNode(String nodeId) {
throw new NotImplementedException();
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
throw new NotImplementedException();
@ -935,4 +1136,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
+ "in the chain. Check if the interceptor pipeline configuration "
+ "is correct");
}
@Override
public void shutdown() {
if (threadpool != null) {
threadpool.shutdown();
}
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
@ -234,10 +238,10 @@ public final class RouterWebServiceUtil {
}
/**
* Merges a list of AppInfo grouping by ApplicationId. Our current policy
* is to merge the application reports from the reacheable SubClusters.
* Via configuration parameter, we decide whether to return applications
* for which the primary AM is missing or to omit them.
* Merges a list of AppInfo grouping by ApplicationId. Our current policy is
* to merge the application reports from the reacheable SubClusters. Via
* configuration parameter, we decide whether to return applications for which
* the primary AM is missing or to omit them.
*
* @param appsInfo a list of AppInfo to merge
* @param returnPartialResult if the merge AppsInfo should contain partial
@ -332,4 +336,93 @@ public final class RouterWebServiceUtil {
am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
}
}
/**
* Deletes all the duplicate NodeInfo by discarding the old instances.
*
* @param nodes a list of NodeInfo to check for duplicates
* @return a NodesInfo that contains a list of NodeInfos without duplicates
*/
public static NodesInfo deleteDuplicateNodesInfo(ArrayList<NodeInfo> nodes) {
NodesInfo nodesInfo = new NodesInfo();
Map<String, NodeInfo> nodesMap = new LinkedHashMap<>();
for (NodeInfo node : nodes) {
String nodeId = node.getNodeId();
// If the node already exists, it could be an old instance
if (nodesMap.containsKey(nodeId)) {
// Check if the node is an old instance
if (nodesMap.get(nodeId).getLastHealthUpdate() < node
.getLastHealthUpdate()) {
nodesMap.put(node.getNodeId(), node);
}
} else {
nodesMap.put(node.getNodeId(), node);
}
}
nodesInfo.addAll(new ArrayList<NodeInfo>(nodesMap.values()));
return nodesInfo;
}
/**
* Adds all the values from the second ClusterMetricsInfo to the first one.
*
* @param metrics the ClusterMetricsInfo we want to update
* @param metricsResponse the ClusterMetricsInfo we want to add to the first
* param
*/
public static void mergeMetrics(ClusterMetricsInfo metrics,
ClusterMetricsInfo metricsResponse) {
metrics.setAppsSubmitted(
metrics.getAppsSubmitted() + metricsResponse.getAppsSubmitted());
metrics.setAppsCompleted(
metrics.getAppsCompleted() + metricsResponse.getAppsCompleted());
metrics.setAppsPending(
metrics.getAppsPending() + metricsResponse.getAppsPending());
metrics.setAppsRunning(
metrics.getAppsRunning() + metricsResponse.getAppsRunning());
metrics.setAppsFailed(
metrics.getAppsFailed() + metricsResponse.getAppsFailed());
metrics.setAppsKilled(
metrics.getAppsKilled() + metricsResponse.getAppsKilled());
metrics.setReservedMB(
metrics.getReservedMB() + metricsResponse.getReservedMB());
metrics.setAvailableMB(
metrics.getAvailableMB() + metricsResponse.getAvailableMB());
metrics.setAllocatedMB(
metrics.getAllocatedMB() + metricsResponse.getAllocatedMB());
metrics.setReservedVirtualCores(metrics.getReservedVirtualCores()
+ metricsResponse.getReservedVirtualCores());
metrics.setAvailableVirtualCores(metrics.getAvailableVirtualCores()
+ metricsResponse.getAvailableVirtualCores());
metrics.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores()
+ metricsResponse.getAllocatedVirtualCores());
metrics.setContainersAllocated(metrics.getContainersAllocated()
+ metricsResponse.getContainersAllocated());
metrics.setContainersReserved(metrics.getReservedContainers()
+ metricsResponse.getReservedContainers());
metrics.setContainersPending(metrics.getPendingContainers()
+ metricsResponse.getPendingContainers());
metrics.setTotalMB(metrics.getTotalMB() + metricsResponse.getTotalMB());
metrics.setTotalVirtualCores(
metrics.getTotalVirtualCores() + metrics.getTotalVirtualCores());
metrics.setTotalNodes(metrics.getTotalNodes() + metrics.getTotalNodes());
metrics.setLostNodes(metrics.getLostNodes() + metrics.getLostNodes());
metrics.setUnhealthyNodes(
metrics.getUnhealthyNodes() + metrics.getUnhealthyNodes());
metrics.setDecommissioningNodes(
metrics.getDecommissioningNodes() + metrics.getDecommissioningNodes());
metrics.setDecommissionedNodes(
metrics.getDecommissionedNodes() + metrics.getDecommissionedNodes());
metrics.setRebootedNodes(
metrics.getRebootedNodes() + metrics.getRebootedNodes());
metrics.setActiveNodes(metrics.getActiveNodes() + metrics.getActiveNodes());
metrics.setShutdownNodes(
metrics.getShutdownNodes() + metrics.getShutdownNodes());
}
}

View File

@ -38,7 +38,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
@ -149,6 +152,46 @@ public class MockDefaultRequestInterceptorREST
return Response.status(Status.OK).entity(ret).build();
}
@Override
public NodeInfo getNode(String nodeId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
NodeInfo node = new NodeInfo();
node.setId(nodeId);
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
return node;
}
@Override
public NodesInfo getNodes(String states) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
NodeInfo node = new NodeInfo();
node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
NodesInfo nodes = new NodesInfo();
nodes.add(node);
return nodes;
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId()));
return metrics;
}
public void setSubClusterId(int subClusterId) {
setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
}

View File

@ -36,7 +36,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@ -388,7 +391,56 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
Assert.assertNotNull(responseGet);
Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
// The merged operations will be tested in TestRouterWebServiceUtil
// The merged operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case each subcluster
* provided one node with the LastHealthUpdate set to the SubClusterId. The
* expected result would be the NodeInfo from the last SubCluster that has
* LastHealthUpdate equal to Num_SubCluster -1.
*/
@Test
public void testGetNode() {
NodeInfo responseGet = interceptor.getNode("testGetNode");
Assert.assertNotNull(responseGet);
Assert.assertEquals(NUM_SUBCLUSTER - 1, responseGet.getLastHealthUpdate());
}
/**
* This test validates the correctness of GetNodes in case each subcluster
* provided one node.
*/
@Test
public void testGetNodes() {
NodesInfo responseGet = interceptor.getNodes(null);
Assert.assertNotNull(responseGet);
Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getNodes().size());
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of getClusterMetricsInfo in case each
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to the
* SubClusterId. The expected result would be appSubmitted equals to the sum
* of SubClusterId. SubClusterId in this case is an integer.
*/
@Test
public void testGetClusterMetrics() {
ClusterMetricsInfo responseGet = interceptor.getClusterMetricsInfo();
Assert.assertNotNull(responseGet);
int expectedAppSubmitted = 0;
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
expectedAppSubmitted += i;
}
Assert.assertEquals(expectedAppSubmitted, responseGet.getAppsSubmitted());
// The merge operations is tested in TestRouterWebServiceUtil
}
}

View File

@ -37,9 +37,13 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@ -88,9 +92,9 @@ public class TestFederationInterceptorRESTRetry
interceptor.init(user);
// Create SubClusters
good = SubClusterId.newInstance("0");
bad1 = SubClusterId.newInstance("1");
bad2 = SubClusterId.newInstance("2");
good = SubClusterId.newInstance("1");
bad1 = SubClusterId.newInstance("2");
bad2 = SubClusterId.newInstance("3");
scs.add(good);
scs.add(bad1);
scs.add(bad2);
@ -316,4 +320,201 @@ public class TestFederationInterceptorRESTRetry
Assert.assertEquals(1, response.getApps().size());
}
/**
* This test validates the correctness of GetNode in case the cluster is
* composed of only 1 bad SubCluster.
*/
@Test
public void testGetNodeOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
try {
interceptor.getNode("testGetNodeOneBadSC");
Assert.fail();
} catch (NotFoundException e) {
Assert.assertTrue(
e.getMessage().contains("nodeId, testGetNodeOneBadSC, is not found"));
}
}
/**
* This test validates the correctness of GetNode in case the cluster is
* composed of only 2 bad SubClusters.
*/
@Test
public void testGetNodeTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
try {
interceptor.getNode("testGetNodeTwoBadSCs");
Assert.fail();
} catch (NotFoundException e) {
Assert.assertTrue(e.getMessage()
.contains("nodeId, testGetNodeTwoBadSCs, is not found"));
}
}
/**
* This test validates the correctness of GetNode in case the cluster is
* composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testGetNodeOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
NodeInfo response = interceptor.getNode(null);
Assert.assertNotNull(response);
// Check if the only node came from Good SubCluster
Assert.assertEquals(good.getId(),
Long.toString(response.getLastHealthUpdate()));
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 1 bad SubCluster.
*/
@Test
public void testGetNodesOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getNodes().size());
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 2 bad SubClusters.
*/
@Test
public void testGetNodesTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getNodes().size());
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testGetNodesOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getNodes().size());
// Check if the only node came from Good SubCluster
Assert.assertEquals(good.getId(),
Long.toString(response.getNodes().get(0).getLastHealthUpdate()));
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 1 bad SubCluster. The excepted result would be a
* ClusterMetricsInfo with all its values set to 0.
*/
@Test
public void testGetClusterMetricsOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
Assert.assertNotNull(response);
// check if we got an empty metrics
checkEmptyMetrics(response);
}
/**
* This test validates the correctness of GetClusterMetrics in case the
* cluster is composed of only 2 bad SubClusters. The excepted result would be
* a ClusterMetricsInfo with all its values set to 0.
*/
@Test
public void testGetClusterMetricsTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
Assert.assertNotNull(response);
// check if we got an empty metrics
Assert.assertEquals(0, response.getAppsSubmitted());
}
/**
* This test validates the correctness of GetClusterMetrics in case the
* cluster is composed of only 1 bad SubCluster and a good one. The good
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to its
* SubClusterId. The expected result would be appSubmitted equals to its
* SubClusterId. SubClusterId in this case is an integer.
*/
@Test
public void testGetClusterMetricsOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
Assert.assertNotNull(response);
checkMetricsFromGoodSC(response);
// The merge operations is tested in TestRouterWebServiceUtil
}
private void checkMetricsFromGoodSC(ClusterMetricsInfo response) {
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsSubmitted());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsCompleted());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsPending());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsRunning());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsFailed());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsKilled());
}
private void checkEmptyMetrics(ClusterMetricsInfo response) {
Assert.assertEquals(0, response.getAppsSubmitted());
Assert.assertEquals(0, response.getAppsCompleted());
Assert.assertEquals(0, response.getAppsPending());
Assert.assertEquals(0, response.getAppsRunning());
Assert.assertEquals(0, response.getAppsFailed());
Assert.assertEquals(0, response.getAppsKilled());
Assert.assertEquals(0, response.getReservedMB());
Assert.assertEquals(0, response.getAvailableMB());
Assert.assertEquals(0, response.getAllocatedMB());
Assert.assertEquals(0, response.getReservedVirtualCores());
Assert.assertEquals(0, response.getAvailableVirtualCores());
Assert.assertEquals(0, response.getAllocatedVirtualCores());
Assert.assertEquals(0, response.getContainersAllocated());
Assert.assertEquals(0, response.getReservedContainers());
Assert.assertEquals(0, response.getPendingContainers());
Assert.assertEquals(0, response.getTotalMB());
Assert.assertEquals(0, response.getTotalVirtualCores());
Assert.assertEquals(0, response.getTotalNodes());
Assert.assertEquals(0, response.getLostNodes());
Assert.assertEquals(0, response.getUnhealthyNodes());
Assert.assertEquals(0, response.getDecommissioningNodes());
Assert.assertEquals(0, response.getDecommissionedNodes());
Assert.assertEquals(0, response.getRebootedNodes());
Assert.assertEquals(0, response.getActiveNodes());
Assert.assertEquals(0, response.getShutdownNodes());
}
}

View File

@ -20,11 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert;
@ -40,6 +44,11 @@ public class TestRouterWebServiceUtil {
private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
private static final String NODE1 = "Node1";
private static final String NODE2 = "Node2";
private static final String NODE3 = "Node3";
private static final String NODE4 = "Node4";
/**
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
* in case we want to merge 4 AMs. The expected result would be the same 4
@ -308,4 +317,257 @@ public class TestRouterWebServiceUtil {
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getApps().size());
}
/**
* This test validates the correctness of
* RouterWebServiceUtil#deleteDuplicateNodesInfo in case we want to merge 4
* Nodes. The expected result would be the same 4 Nodes.
*/
@Test
public void testDeleteDuplicate4DifferentNodes() {
NodesInfo nodes = new NodesInfo();
NodeInfo nodeInfo1 = new NodeInfo();
nodeInfo1.setId(NODE1);
nodes.add(nodeInfo1);
NodeInfo nodeInfo2 = new NodeInfo();
nodeInfo2.setId(NODE2);
nodes.add(nodeInfo2);
NodeInfo nodeInfo3 = new NodeInfo();
nodeInfo3.setId(NODE3);
nodes.add(nodeInfo3);
NodeInfo nodeInfo4 = new NodeInfo();
nodeInfo4.setId(NODE4);
nodes.add(nodeInfo4);
NodesInfo result =
RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
Assert.assertNotNull(result);
Assert.assertEquals(4, result.getNodes().size());
List<String> nodesIds = new ArrayList<String>();
for (NodeInfo node : result.getNodes()) {
nodesIds.add(node.getNodeId());
}
Assert.assertTrue(nodesIds.contains(NODE1));
Assert.assertTrue(nodesIds.contains(NODE2));
Assert.assertTrue(nodesIds.contains(NODE3));
Assert.assertTrue(nodesIds.contains(NODE4));
}
/**
* This test validates the correctness of
* {@link RouterWebServiceUtil#deleteDuplicateNodesInfo(ArrayList)} in case we
* want to merge 3 nodes with the same id. The expected result would be 1 node
* report with the newest healthy report.
*/
@Test
public void testDeleteDuplicateNodes() {
NodesInfo nodes = new NodesInfo();
NodeInfo node1 = new NodeInfo();
node1.setId(NODE1);
node1.setLastHealthUpdate(0);
nodes.add(node1);
NodeInfo node2 = new NodeInfo();
node2.setId(NODE1);
node2.setLastHealthUpdate(1);
nodes.add(node2);
NodeInfo node3 = new NodeInfo();
node3.setId(NODE1);
node3.setLastHealthUpdate(2);
nodes.add(node3);
NodesInfo result =
RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getNodes().size());
NodeInfo node = result.getNodes().get(0);
Assert.assertEquals(NODE1, node.getNodeId());
Assert.assertEquals(2, node.getLastHealthUpdate());
}
/**
* This test validates the correctness of
* {@link RouterWebServiceUtil#mergeMetrics}.
*/
@Test
public void testMergeMetrics() {
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
ClusterMetricsInfo metricsResponse = new ClusterMetricsInfo();
setUpClusterMetrics(metrics);
setUpClusterMetrics(metricsResponse);
ClusterMetricsInfo metricsClone = createClusterMetricsClone(metrics);
RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
Assert.assertEquals(
metricsResponse.getAppsSubmitted() + metricsClone.getAppsSubmitted(),
metrics.getAppsSubmitted());
Assert.assertEquals(
metricsResponse.getAppsCompleted() + metricsClone.getAppsCompleted(),
metrics.getAppsCompleted());
Assert.assertEquals(
metricsResponse.getAppsPending() + metricsClone.getAppsPending(),
metrics.getAppsPending());
Assert.assertEquals(
metricsResponse.getAppsRunning() + metricsClone.getAppsRunning(),
metrics.getAppsRunning());
Assert.assertEquals(
metricsResponse.getAppsFailed() + metricsClone.getAppsFailed(),
metrics.getAppsFailed());
Assert.assertEquals(
metricsResponse.getAppsKilled() + metricsClone.getAppsKilled(),
metrics.getAppsKilled());
Assert.assertEquals(
metricsResponse.getReservedMB() + metricsClone.getReservedMB(),
metrics.getReservedMB());
Assert.assertEquals(
metricsResponse.getAvailableMB() + metricsClone.getAvailableMB(),
metrics.getAvailableMB());
Assert.assertEquals(
metricsResponse.getAllocatedMB() + metricsClone.getAllocatedMB(),
metrics.getAllocatedMB());
Assert.assertEquals(
metricsResponse.getReservedVirtualCores()
+ metricsClone.getReservedVirtualCores(),
metrics.getReservedVirtualCores());
Assert.assertEquals(
metricsResponse.getAvailableVirtualCores()
+ metricsClone.getAvailableVirtualCores(),
metrics.getAvailableVirtualCores());
Assert.assertEquals(
metricsResponse.getAllocatedVirtualCores()
+ metricsClone.getAllocatedVirtualCores(),
metrics.getAllocatedVirtualCores());
Assert.assertEquals(
metricsResponse.getContainersAllocated()
+ metricsClone.getContainersAllocated(),
metrics.getContainersAllocated());
Assert.assertEquals(
metricsResponse.getReservedContainers()
+ metricsClone.getReservedContainers(),
metrics.getReservedContainers());
Assert.assertEquals(
metricsResponse.getPendingContainers()
+ metricsClone.getPendingContainers(),
metrics.getPendingContainers());
Assert.assertEquals(
metricsResponse.getTotalMB() + metricsClone.getTotalMB(),
metrics.getTotalMB());
Assert.assertEquals(
metricsResponse.getTotalVirtualCores()
+ metricsClone.getTotalVirtualCores(),
metrics.getTotalVirtualCores());
Assert.assertEquals(
metricsResponse.getTotalNodes() + metricsClone.getTotalNodes(),
metrics.getTotalNodes());
Assert.assertEquals(
metricsResponse.getLostNodes() + metricsClone.getLostNodes(),
metrics.getLostNodes());
Assert.assertEquals(
metricsResponse.getUnhealthyNodes() + metricsClone.getUnhealthyNodes(),
metrics.getUnhealthyNodes());
Assert.assertEquals(
metricsResponse.getDecommissioningNodes()
+ metricsClone.getDecommissioningNodes(),
metrics.getDecommissioningNodes());
Assert.assertEquals(
metricsResponse.getDecommissionedNodes()
+ metricsClone.getDecommissionedNodes(),
metrics.getDecommissionedNodes());
Assert.assertEquals(
metricsResponse.getRebootedNodes() + metricsClone.getRebootedNodes(),
metrics.getRebootedNodes());
Assert.assertEquals(
metricsResponse.getActiveNodes() + metricsClone.getActiveNodes(),
metrics.getActiveNodes());
Assert.assertEquals(
metricsResponse.getShutdownNodes() + metricsClone.getShutdownNodes(),
metrics.getShutdownNodes());
}
private ClusterMetricsInfo createClusterMetricsClone(
ClusterMetricsInfo metrics) {
ClusterMetricsInfo metricsClone = new ClusterMetricsInfo();
metricsClone.setAppsSubmitted(metrics.getAppsSubmitted());
metricsClone.setAppsCompleted(metrics.getAppsCompleted());
metricsClone.setAppsPending(metrics.getAppsPending());
metricsClone.setAppsRunning(metrics.getAppsRunning());
metricsClone.setAppsFailed(metrics.getAppsFailed());
metricsClone.setAppsKilled(metrics.getAppsKilled());
metricsClone.setReservedMB(metrics.getReservedMB());
metricsClone.setAvailableMB(metrics.getAvailableMB());
metricsClone.setAllocatedMB(metrics.getAllocatedMB());
metricsClone.setReservedVirtualCores(metrics.getReservedVirtualCores());
metricsClone.setAvailableVirtualCores(metrics.getAvailableVirtualCores());
metricsClone.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores());
metricsClone.setContainersAllocated(metrics.getContainersAllocated());
metricsClone.setContainersReserved(metrics.getReservedContainers());
metricsClone.setContainersPending(metrics.getPendingContainers());
metricsClone.setTotalMB(metrics.getTotalMB());
metricsClone.setTotalVirtualCores(metrics.getTotalVirtualCores());
metricsClone.setTotalNodes(metrics.getTotalNodes());
metricsClone.setLostNodes(metrics.getLostNodes());
metricsClone.setUnhealthyNodes(metrics.getUnhealthyNodes());
metricsClone.setDecommissioningNodes(metrics.getDecommissioningNodes());
metricsClone.setDecommissionedNodes(metrics.getDecommissionedNodes());
metricsClone.setRebootedNodes(metrics.getRebootedNodes());
metricsClone.setActiveNodes(metrics.getActiveNodes());
metricsClone.setShutdownNodes(metrics.getShutdownNodes());
return metricsClone;
}
private void setUpClusterMetrics(ClusterMetricsInfo metrics) {
Random rand = new Random(System.currentTimeMillis());
metrics.setAppsSubmitted(rand.nextInt(1000));
metrics.setAppsCompleted(rand.nextInt(1000));
metrics.setAppsPending(rand.nextInt(1000));
metrics.setAppsRunning(rand.nextInt(1000));
metrics.setAppsFailed(rand.nextInt(1000));
metrics.setAppsKilled(rand.nextInt(1000));
metrics.setReservedMB(rand.nextInt(1000));
metrics.setAvailableMB(rand.nextInt(1000));
metrics.setAllocatedMB(rand.nextInt(1000));
metrics.setReservedVirtualCores(rand.nextInt(1000));
metrics.setAvailableVirtualCores(rand.nextInt(1000));
metrics.setAllocatedVirtualCores(rand.nextInt(1000));
metrics.setContainersAllocated(rand.nextInt(1000));
metrics.setContainersReserved(rand.nextInt(1000));
metrics.setContainersPending(rand.nextInt(1000));
metrics.setTotalMB(rand.nextInt(1000));
metrics.setTotalVirtualCores(rand.nextInt(1000));
metrics.setTotalNodes(rand.nextInt(1000));
metrics.setLostNodes(rand.nextInt(1000));
metrics.setUnhealthyNodes(rand.nextInt(1000));
metrics.setDecommissioningNodes(rand.nextInt(1000));
metrics.setDecommissionedNodes(rand.nextInt(1000));
metrics.setRebootedNodes(rand.nextInt(1000));
metrics.setActiveNodes(rand.nextInt(1000));
metrics.setShutdownNodes(rand.nextInt(1000));
}
}