From b45b1394b60f9a803ae5a8e955878d6a1e7c47c7 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Mon, 26 Aug 2013 15:42:43 +0000 Subject: [PATCH] YARN-1008. MiniYARNCluster with multiple nodemanagers, all nodes have same key for allocations. (tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1517564 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 9 ++++ .../scheduler/AppSchedulingInfo.java | 2 +- .../scheduler/SchedulerNode.java | 14 ++++-- .../scheduler/capacity/CapacityScheduler.java | 7 ++- .../CapacitySchedulerConfiguration.java | 5 ++ .../scheduler/capacity/LeafQueue.java | 8 +-- .../common/fica/FiCaSchedulerNode.java | 12 +++-- .../common/fica/FiCaSchedulerUtils.java | 4 +- .../scheduler/fair/AppSchedulable.java | 6 +-- .../scheduler/fair/FSSchedulerNode.java | 12 +++-- .../scheduler/fair/FairScheduler.java | 7 +-- .../fair/FairSchedulerConfiguration.java | 7 ++- .../scheduler/fifo/FifoScheduler.java | 9 +++- .../server/resourcemanager/MockNodes.java | 14 ++++-- .../server/resourcemanager/NodeManager.java | 2 +- .../capacity/TestChildQueueOrder.java | 3 +- .../scheduler/capacity/TestParentQueue.java | 2 +- .../scheduler/capacity/TestUtils.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 50 +++++++++++++++++++ .../hadoop/yarn/server/MiniYARNCluster.java | 15 ++++++ 21 files changed, 156 insertions(+), 37 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3d1d44e2824..c4069888867 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -81,6 +81,9 @@ Release 2.1.1-beta - UNRELEASED YARN-1094. Fixed a blocker with RM restart code because of which RM crashes when try to recover an existing app. (vinodkv) + YARN-1008. MiniYARNCluster with multiple nodemanagers, all nodes have same + key for allocations. (tucu) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f57091e4380..904b4d57b7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -132,6 +132,15 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "scheduler.client.thread-count"; public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; + /** If the port should be included or not in the node name. The node name + * is used by the scheduler for resource requests allocation location + * matching. Typically this is just the hostname, using the port is needed + * when using minicluster and specific NM are required.*/ + public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME = + YARN_PREFIX + "scheduler.include-port-in-node-name"; + public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = + false; + /** * Enable periodic monitor threads. * @see #RM_SCHEDULER_MONITOR_POLICIES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1ff00be4ce6..6f8144d4c69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -281,7 +281,7 @@ public class AppSchedulingInfo { // Update future requirements nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1); if (nodeLocalRequest.getNumContainers() == 0) { - this.requests.get(priority).remove(node.getHostName()); + this.requests.get(priority).remove(node.getNodeName()); } ResourceRequest rackLocalRequest = requests.get(priority).get( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index a08ba7090ab..8a80bf8cf9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -30,10 +31,17 @@ import org.apache.hadoop.yarn.api.records.Resource; public abstract class SchedulerNode { /** - * Get hostname. - * @return hostname + * Get the name of the node for scheduling matching decisions. + *

+ * Typically this is the 'hostname' reported by the node, but it could be + * configured to be 'hostname:port' reported by the node via the + * {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant. + * The main usecase of this is Yarn minicluster to be able to differentiate + * node manager instances by their port number. + * + * @return name of the node for scheduling matching decisions. */ - public abstract String getHostName(); + public abstract String getNodeName(); /** * Get rackname. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 29c4d4b9de3..2efb9ad6719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -185,7 +185,8 @@ public class CapacityScheduler private boolean initialized = false; private ResourceCalculator calculator; - + private boolean usePortForNodeName; + public CapacityScheduler() {} @Override @@ -256,6 +257,7 @@ public class CapacityScheduler this.minimumAllocation = this.conf.getMinimumAllocation(); this.maximumAllocation = this.conf.getMaximumAllocation(); this.calculator = this.conf.getResourceCalculator(); + this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.rmContext = rmContext; @@ -759,7 +761,8 @@ public class CapacityScheduler } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager)); + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, + usePortForNodeName)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); ++numNodeManagers; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6d209ca1a7a..6fceabf0dec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -338,6 +338,11 @@ public class CapacitySchedulerConfiguration extends Configuration { this); } + public boolean getUsePortForNodeName() { + return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + } + public void setResourceComparator( Class resourceCalculatorClass) { setClass( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index c2c5d27576d..41b3f5e3037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -801,7 +801,7 @@ public class LeafQueue implements CSQueue { assignContainers(Resource clusterResource, FiCaSchedulerNode node) { if(LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getHostName() + LOG.debug("assignContainers: node=" + node.getNodeName() + " #applications=" + activeApplications.size()); } @@ -1130,7 +1130,7 @@ public class LeafQueue implements CSQueue { // Data-local ResourceRequest nodeLocalResourceRequest = - application.getResourceRequest(priority, node.getHostName()); + application.getResourceRequest(priority, node.getNodeName()); if (nodeLocalResourceRequest != null) { assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, @@ -1257,7 +1257,7 @@ public class LeafQueue implements CSQueue { if (type == NodeType.NODE_LOCAL) { // Now check if we need containers on this host... ResourceRequest nodeLocalRequest = - application.getResourceRequest(priority, node.getHostName()); + application.getResourceRequest(priority, node.getNodeName()); if (nodeLocalRequest != null) { return nodeLocalRequest.getNumContainers() > 0; } @@ -1302,7 +1302,7 @@ public class LeafQueue implements CSQueue { FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer) { if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getHostName() + LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId().getId() + " priority=" + priority.getPriority() + " request=" + request + " type=" + type); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index bb9ba92e0ee..7a306ec4281 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -59,11 +59,17 @@ public class FiCaSchedulerNode extends SchedulerNode { new HashMap(); private final RMNode rmNode; + private final String nodeName; - public FiCaSchedulerNode(RMNode node) { + public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { this.rmNode = node; this.availableResource.setMemory(node.getTotalCapability().getMemory()); this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores()); + if (usePortForNodeName) { + nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); + } else { + nodeName = rmNode.getHostName(); + } } public RMNode getRMNode() { @@ -79,8 +85,8 @@ public class FiCaSchedulerNode extends SchedulerNode { } @Override - public String getHostName() { - return this.rmNode.getHostName(); + public String getNodeName() { + return nodeName; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java index 1e96949787c..9bece9ba50e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java @@ -24,9 +24,9 @@ public class FiCaSchedulerUtils { public static boolean isBlacklisted(FiCaSchedulerApp application, FiCaSchedulerNode node, Log LOG) { - if (application.isBlacklisted(node.getHostName())) { + if (application.isBlacklisted(node.getNodeName())) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping 'host' " + node.getHostName() + + LOG.debug("Skipping 'host' " + node.getNodeName() + " for " + application.getApplicationId() + " since it has been blacklisted"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index bb3190bafa9..14ec99cada5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -185,7 +185,7 @@ public class AppSchedulable extends Schedulable { */ private void reserve(Priority priority, FSSchedulerNode node, Container container, boolean alreadyReserved) { - LOG.info("Making reservation: node=" + node.getHostName() + + LOG.info("Making reservation: node=" + node.getNodeName() + " app_id=" + app.getApplicationId()); if (!alreadyReserved) { getMetrics().reserveResource(app.getUser(), container.getResource()); @@ -309,7 +309,7 @@ public class AppSchedulable extends Schedulable { ResourceRequest rackLocalRequest = app.getResourceRequest(priority, node.getRackName()); ResourceRequest localRequest = app.getResourceRequest(priority, - node.getHostName()); + node.getNodeName()); if (localRequest != null && !localRequest.getRelaxLocality()) { LOG.warn("Relax locality off is not supported on local request: " @@ -369,7 +369,7 @@ public class AppSchedulable extends Schedulable { public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY); ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName()); - ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getHostName()); + ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getNodeName()); return // There must be outstanding requests at the given priority: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index cc15a5d9c7c..bd29f821bb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -63,10 +63,16 @@ public class FSSchedulerNode extends SchedulerNode { new HashMap(); private final RMNode rmNode; + private final String nodeName; - public FSSchedulerNode(RMNode node) { + public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { this.rmNode = node; this.availableResource = Resources.clone(node.getTotalCapability()); + if (usePortForNodeName) { + nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); + } else { + nodeName = rmNode.getHostName(); + } } public RMNode getRMNode() { @@ -82,8 +88,8 @@ public class FSSchedulerNode extends SchedulerNode { } @Override - public String getHostName() { - return rmNode.getHostName(); + public String getNodeName() { + return nodeName; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index b86b031ecf6..7f315781ef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -35,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -122,6 +121,7 @@ public class FairScheduler implements ResourceScheduler { private Resource incrAllocation; private QueueManager queueMgr; private Clock clock; + private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -751,7 +751,7 @@ public class FairScheduler implements ResourceScheduler { } private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node)); + nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); Resources.addTo(clusterCapacity, node.getTotalCapability()); updateRootQueueMetrics(); @@ -1065,7 +1065,8 @@ public class FairScheduler implements ResourceScheduler { sizeBasedWeight = this.conf.getSizeBasedWeight(); preemptionInterval = this.conf.getPreemptionInterval(); waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - + usePortForNodeName = this.conf.getUsePortForNodeName(); + if (!initialized) { rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 0ab82638dd9..acdd40e26ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -166,7 +166,12 @@ public class FairSchedulerConfiguration extends Configuration { public int getWaitTimeBeforeKill() { return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL); } - + + public boolean getUsePortForNodeName() { + return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + } + /** * Parses a resource config value of a form like "1024", "1024 mb", * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index d971f3b4496..115d2089c34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -111,6 +111,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private boolean initialized; private Resource minimumAllocation; private Resource maximumAllocation; + private boolean usePortForNodeName; private Map applications = new TreeMap(); @@ -233,6 +234,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable { Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + this.usePortForNodeName = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, conf); this.activeUsersManager = new ActiveUsersManager(metrics); @@ -490,7 +494,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = - application.getResourceRequest(priority, node.getHostName()); + application.getResourceRequest(priority, node.getNodeName()); if (request != null) { // Don't allocate on this node if we don't need containers on this rack ResourceRequest rackRequest = @@ -801,7 +805,8 @@ public class FifoScheduler implements ResourceScheduler, Configurable { } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager)); + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, + usePortForNodeName)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 83b81a18504..d69828d0fc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -200,15 +200,14 @@ public class MockNodes { }; private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) { - return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null); + return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123); } private static RMNode buildRMNode(int rack, final Resource perNode, - NodeState state, String httpAddr, int hostnum, String hostName) { + NodeState state, String httpAddr, int hostnum, String hostName, int port) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; - final int port = 123; if (hostName == null) { hostName = "host"+ nid; } @@ -230,12 +229,17 @@ public class MockNodes { } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123); + } + + public static RMNode newNodeInfo(int rack, final Resource perNode, + int hostnum, String hostName, int port) { + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index 2c9d67845a4..f943101e1cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -101,7 +101,7 @@ public class NodeManager implements ContainerManagementProtocol { request.setNodeId(this.nodeId); resourceTrackerService.registerNodeManager(request); this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get( - this.nodeId)); + this.nodeId), false); // Sanity check Assert.assertEquals(capability.getMemory(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 014385c12eb..3c55b42006f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.util.HashMap; @@ -126,7 +125,7 @@ public class TestChildQueueOrder { throw new Exception(); } catch (Exception e) { LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() + - " alloc=" + allocation + " node=" + node.getHostName()); + " alloc=" + allocation + " node=" + node.getNodeName()); } final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index c5dbfde5d3d..03480810083 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -138,7 +138,7 @@ public class TestParentQueue { throw new Exception(); } catch (Exception e) { LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() + - " alloc=" + allocation + " node=" + node.getHostName()); + " alloc=" + allocation + " node=" + node.getNodeName()); } final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 6e7fe789826..b974528a3cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -160,7 +160,7 @@ public class TestUtils { when(rmNode.getHostName()).thenReturn(host); when(rmNode.getRackName()).thenReturn(rack); - FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode)); + FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false)); LOG.info("node = " + host + " avail=" + node.getAvailableResource()); return node; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b0327c1de58..0bcfbb910ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2146,4 +2146,54 @@ public class TestFairScheduler { Assert.assertEquals(2, app3.getLiveContainers().size()); Assert.assertEquals(2, app4.getLiveContainers().size()); } + + @Test(timeout = 30000) + public void testHostPortNodeName() throws Exception { + scheduler.getConf().setBoolean(YarnConfiguration + .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); + scheduler.reinitialize(scheduler.getConf(), + resourceManager.getRMContext()); + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), + 1, "127.0.0.1", 1); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), + 2, "127.0.0.1", 2); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 0); + + ResourceRequest nodeRequest = createResourceRequest(1024, + node1.getNodeID().getHost() + ":" + node1.getNodeID().getPort(), 1, + 1, true); + ResourceRequest rackRequest = createResourceRequest(1024, + node1.getRackName(), 1, 1, false); + ResourceRequest anyRequest = createResourceRequest(1024, + ResourceRequest.ANY, 1, 1, false); + createSchedulingRequestExistingApplication(nodeRequest, attId1); + createSchedulingRequestExistingApplication(rackRequest, attId1); + createSchedulingRequestExistingApplication(anyRequest, attId1); + + scheduler.update(); + + NodeUpdateSchedulerEvent node1UpdateEvent = new + NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent node2UpdateEvent = new + NodeUpdateSchedulerEvent(node2); + + // no matter how many heartbeats, node2 should never get a container + FSSchedulerApp app = scheduler.applications.get(attId1); + for (int i = 0; i < 10; i++) { + scheduler.handle(node2UpdateEvent); + assertEquals(0, app.getLiveContainers().size()); + assertEquals(0, app.getReservedContainers().size()); + } + // then node1 should get the container + scheduler.handle(node1UpdateEvent); + assertEquals(1, app.getLiveContainers().size()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index cba27e4b323..1281c2409bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -53,6 +53,21 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +/** + * Embedded Yarn minicluster for testcases that need to interact with a cluster. + *

+ * In a real cluster, resource request matching is done using the hostname, and + * by default Yarn minicluster works in the exact same way as a real cluster. + *

+ * If a testcase needs to use multiple nodes and exercise resource request + * matching to a specific node, then the property + * {@YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} should be set + * true in the configuration used to initialize the minicluster. + *

+ * With this property set to true, the matching will be done using + * the hostname:port of the namenodes. In such case, the AM must + * do resource request using hostname:port as the location. + */ public class MiniYARNCluster extends CompositeService { private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);