From 6990355e577ad19371cb656c250fb665ed14062f Mon Sep 17 00:00:00 2001 From: Luke Lu Date: Tue, 5 Nov 2013 21:23:53 +0000 Subject: [PATCH] YARN-311. RM/scheduler support for dynamic resource configuration. (Junping Du via llu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1539134 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 21 ++++- .../yarn/sls/scheduler/RMNodeWrapper.java | 20 +++- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/records/ResourceOption.java | 65 +++++++++++++ .../src/main/proto/yarn_protos.proto | 5 + .../records/impl/pb/ResourceOptionPBImpl.java | 89 ++++++++++++++++++ .../ResourceTrackerService.java | 4 +- .../server/resourcemanager/rmnode/RMNode.java | 19 +++- .../resourcemanager/rmnode/RMNodeImpl.java | 21 ++++- .../scheduler/SchedulerNode.java | 6 ++ .../scheduler/SchedulerUtils.java | 36 ++++++- .../scheduler/capacity/CapacityScheduler.java | 6 +- .../common/fica/FiCaSchedulerNode.java | 6 ++ .../scheduler/fair/FSSchedulerNode.java | 7 ++ .../scheduler/fair/FairScheduler.java | 3 + .../scheduler/fifo/FifoScheduler.java | 7 +- .../server/resourcemanager/MockNodes.java | 23 ++++- .../TestRMNodeTransitions.java | 5 +- .../scheduler/fifo/TestFifoScheduler.java | 93 +++++++++++++++++++ 19 files changed, 410 insertions(+), 29 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index b1c76089ed7..76671572ce8 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -48,7 +49,7 @@ public class NodeInfo { private String nodeAddr; private String httpAddress; private int cmdPort; - private Resource perNode; + private volatile ResourceOption perNode; private String rackName; private String healthReport; private NodeState state; @@ -56,7 +57,7 @@ public class NodeInfo { private List toCleanUpApplications; public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, - Resource perNode, String rackName, String healthReport, + ResourceOption perNode, String rackName, String healthReport, int cmdPort, String hostName, NodeState state) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; @@ -104,6 +105,10 @@ public class NodeInfo { } public Resource getTotalCapability() { + return perNode.getResource(); + } + + public ResourceOption getResourceOption() { return perNode; } @@ -153,21 +158,27 @@ public class NodeInfo { // TODO Auto-generated method stub return null; } + + @Override + public void setResourceOption(ResourceOption resourceOption) { + perNode = resourceOption; + } } public static RMNode newNodeInfo(String rackName, String hostName, - final Resource resource, int port) { + final ResourceOption resourceOption, int port) { final NodeId nodeId = newNodeID(hostName, port); final String nodeAddr = hostName + ":" + port; final String httpAddress = hostName; return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress, - resource, rackName, "Me good", + resourceOption, rackName, "Me good", port, hostName, null); } public static RMNode newNodeInfo(String rackName, String hostName, final Resource resource) { - return newNodeInfo(rackName, hostName, resource, NODE_ID++); + return newNodeInfo(rackName, hostName, ResourceOption.newInstance(resource, + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), NODE_ID++); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index c6fa2f01358..bbe24c883c7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -138,10 +139,19 @@ public class RMNodeWrapper implements RMNode { return updates; } -@Override -public String getNodeManagerVersion() { - // TODO Auto-generated method stub - return null; -} + @Override + public String getNodeManagerVersion() { + return node.getNodeManagerVersion(); + } + + @Override + public void setResourceOption(ResourceOption resourceOption) { + node.setResourceOption(resourceOption); + } + + @Override + public ResourceOption getResourceOption() { + return node.getResourceOption(); + } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index dd760fd69f5..21a2e94b603 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -37,6 +37,9 @@ Release 2.3.0 - UNRELEASED YARN-1068. Add admin support for HA operations (Karthik Kambatla via bikas) + YARN-311. RM/scheduler support for dynamic resource configuration. + (Junping Du via llu) + IMPROVEMENTS YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java new file mode 100644 index 00000000000..2844a952ada --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class ResourceOption { + + public static ResourceOption newInstance(Resource resource, + int overCommitTimeout){ + ResourceOption resourceOption = Records.newRecord(ResourceOption.class); + resourceOption.setResource(resource); + return resourceOption; + } + + /** + * Get the resource of the ResourceOption. + * @return resource of the ResourceOption + */ + @Private + @Evolving + public abstract Resource getResource(); + + @Private + @Evolving + protected abstract void setResource(Resource resource); + + /** + * Get timeout for tolerant of resource over-commitment + * Note: negative value means no timeout so that allocated containers will + * keep running until the end even under resource over-commitment cases. + * @return overCommitTimeout of the ResourceOption + */ + @Private + @Evolving + public abstract int getOverCommitTimeout(); + + @Private + @Evolving + protected abstract void setOverCommitTimeout(int overCommitTimeout); + + protected abstract void build(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 31923068cbd..9d4d59e52a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -58,6 +58,11 @@ message ResourceProto { optional int32 virtual_cores = 2; } +message ResourceOptionProto { + optional ResourceProto resource = 1; + optional int32 over_commit_timeout = 2; +} + message PriorityProto { optional int32 priority = 1; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java new file mode 100644 index 00000000000..5440a8491e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto; + +import com.google.common.base.Preconditions; + +public class ResourceOptionPBImpl extends ResourceOption { + + ResourceOptionProto proto = null; + ResourceOptionProto.Builder builder = null; + private Resource resource = null; + + public ResourceOptionPBImpl() { + builder = ResourceOptionProto.newBuilder(); + } + + public ResourceOptionPBImpl(ResourceOptionProto proto) { + this.proto = proto; + this.resource = convertFromProtoFormat(proto.getResource()); + } + + public ResourceOptionProto getProto() { + return proto; + } + + @Override + public Resource getResource() { + return this.resource; + } + + @Override + protected void setResource(Resource resource) { + if (resource != null) { + Preconditions.checkNotNull(builder); + builder.setResource(convertToProtoFormat(resource)); + } + this.resource = resource; + } + + @Override + public int getOverCommitTimeout() { + Preconditions.checkNotNull(proto); + return proto.getOverCommitTimeout(); + } + + @Override + protected void setOverCommitTimeout(int overCommitTimeout) { + Preconditions.checkNotNull(builder); + builder.setOverCommitTimeout(overCommitTimeout); + } + + private ResourceProto convertToProtoFormat( + Resource resource) { + return ((ResourcePBImpl)resource).getProto(); + } + + private ResourcePBImpl convertFromProtoFormat( + ResourceProto p) { + return new ResourcePBImpl(p); + } + + @Override + protected void build() { + proto = builder.build(); + builder = null; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 7995fb37d60..23f87549ce3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -234,7 +235,8 @@ public class ResourceTrackerService extends AbstractService implements .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion); + resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), + nodeManagerVersion); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index b3609747d1a..24793e86f17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; /** @@ -35,6 +37,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; */ public interface RMNode { + /** negative value means no timeout */ + public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = -1; + /** * the node id of of this node. * @return the node id of this node. @@ -94,7 +99,19 @@ public interface RMNode { * the total available resource. * @return the total available resource. */ - public org.apache.hadoop.yarn.api.records.Resource getTotalCapability(); + public Resource getTotalCapability(); + + /** + * Set resource option with total available resource and overCommitTimoutMillis + * @param resourceOption + */ + public void setResourceOption(ResourceOption resourceOption); + + /** + * resource option with total available resource and overCommitTimoutMillis + * @return ResourceOption + */ + public ResourceOption getResourceOption(); /** * The rack name for this node manager. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 74291005a44..52bc285b3ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -92,7 +93,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private final int httpPort; private final String nodeAddress; // The containerManager address private final String httpAddress; - private final Resource totalCapability; + private volatile ResourceOption resourceOption; private final Node node; private String healthReport; @@ -173,13 +174,13 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEvent> stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, - int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { + int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; this.commandPort = cmPort; this.httpPort = httpPort; - this.totalCapability = capability; + this.resourceOption = resourceOption; this.nodeAddress = hostName + ":" + cmPort; this.httpAddress = hostName + ":" + httpPort; this.node = node; @@ -235,14 +236,24 @@ public class RMNodeImpl implements RMNode, EventHandler { @Override public Resource getTotalCapability() { - return this.totalCapability; + return this.resourceOption.getResource(); + } + + @Override + public void setResourceOption(ResourceOption resourceOption) { + this.resourceOption = resourceOption; + } + + @Override + public ResourceOption getResourceOption(){ + return this.resourceOption; } @Override public String getRackName() { return node.getNetworkLocation(); } - + @Override public Node getNode() { return this.node; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 05872f9a31f..524b1abd5f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -67,6 +67,12 @@ public abstract class SchedulerNode { * @return number of active containers on the node */ public abstract int getNumContainers(); + + /** + * Apply delta resource on node's available resource. + * @param deltaResource the delta of resource need to apply to node + */ + public abstract void applyDeltaOnAvailableResource(Resource deltaResource); /** * Get total resources on the node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index ef71dccf510..572a9f92e40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -19,20 +19,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.List; +import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -147,6 +146,37 @@ public class SchedulerUtils { maximumResource, minimumResource); ask.setCapability(normalized); } + + /** + * Update resource in SchedulerNode if any resource change in RMNode. + * @param node SchedulerNode with old resource view + * @param rmNode RMNode with new resource view + * @param clusterResource the cluster's resource that need to update + * @param log Scheduler's log for resource change + */ + public static void updateResourceIfChanged(SchedulerNode node, + RMNode rmNode, Resource clusterResource, Log log) { + Resource oldAvailableResource = node.getAvailableResource(); + Resource newAvailableResource = Resources.subtract( + rmNode.getTotalCapability(), node.getUsedResource()); + + if (!newAvailableResource.equals(oldAvailableResource)) { + Resource deltaResource = Resources.subtract(newAvailableResource, + oldAvailableResource); + // Reflect resource change to scheduler node. + node.applyDeltaOnAvailableResource(deltaResource); + // Reflect resource change to clusterResource. + Resources.addTo(clusterResource, deltaResource); + // TODO process resource over-commitment case (allocated containers + // > total capacity) in different option by getting value of + // overCommitTimeoutMillis. + + // Log resource change + log.info("Resource change on node: " + rmNode.getNodeAddress() + + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: " + + deltaResource.getMemory() +"MB"); + } + } /** * Utility method to normalize a list of resource requests, by insuring that diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 241fe2f538a..e61ff938fa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -629,6 +629,10 @@ public class CapacityScheduler } FiCaSchedulerNode node = getNode(nm.getNodeID()); + + // Update resource if any change + SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); + List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -695,7 +699,7 @@ public class CapacityScheduler node.getReservedContainer().getContainerId().getApplicationAttemptId() ); } - + } private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 400b3153dcf..23068fefde3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -268,4 +268,10 @@ public class FiCaSchedulerNode extends SchedulerNode { return reservedContainer; } + @Override + public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { + // we can only adjust available resource if total resource is changed. + Resources.addTo(this.availableResource, deltaResource); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index d84547a3ffb..97ea6d46d65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -269,4 +269,11 @@ public class FSSchedulerNode extends SchedulerNode { public synchronized AppSchedulable getReservedAppSchedulable() { return reservedAppSchedulable; } + + @Override + public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { + // we can only adjust available resource if total resource is changed. + Resources.addTo(this.availableResource, deltaResource); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 71c6cbd236e..422a43b668b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -920,6 +920,9 @@ public class FairScheduler implements ResourceScheduler { eventLog.log("HEARTBEAT", nm.getHostName()); FSSchedulerNode node = nodes.get(nm.getNodeID()); + // Update resource if any change + SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); + List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 626c482e219..4242d02c03d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -100,7 +100,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); private RMContext rmContext; - private Map nodes = new ConcurrentHashMap(); + protected Map nodes = new ConcurrentHashMap(); private boolean initialized; private Resource minimumAllocation; @@ -628,6 +628,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private synchronized void nodeUpdate(RMNode rmNode) { FiCaSchedulerNode node = getNode(rmNode.getNodeID()); + // Update resource if any change + SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG); + List containerInfoList = rmNode.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -661,7 +664,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { metrics.setAvailableResourcesToQueue( Resources.subtract(clusterResource, usedResource)); - } + } @Override public void handle(SchedulerEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 049bac860b7..8ef01d998d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -93,14 +94,14 @@ public class MockNodes { private String nodeAddr; private String httpAddress; private int cmdPort; - private Resource perNode; + private ResourceOption perNode; private String rackName; private String healthReport; private long lastHealthReportTime; private NodeState state; public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, - Resource perNode, String rackName, String healthReport, + ResourceOption perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; @@ -146,7 +147,7 @@ public class MockNodes { @Override public Resource getTotalCapability() { - return this.perNode; + return this.perNode.getResource(); } @Override @@ -202,6 +203,17 @@ public class MockNodes { public long getLastHealthReportTime() { return lastHealthReportTime; } + + @Override + public void setResourceOption(ResourceOption resourceOption) { + this.perNode = resourceOption; + } + + @Override + public ResourceOption getResourceOption(){ + return this.perNode; + } + }; private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) { @@ -220,8 +232,9 @@ public class MockNodes { final String httpAddress = httpAddr; String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; - return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, - healthReport, 0, nid, hostName, state); + return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, + ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), + rackName, healthReport, 0, nid, hostName, state); } public static RMNode nodeInfo(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 479128a4f22..82046c7a9de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -36,10 +36,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; @@ -451,7 +453,8 @@ public class TestRMNodeTransitions { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, - null, capability, null); + null, ResourceOption.newInstance(capability, + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index ee302b94509..238459097e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; +import static org.junit.Assert.assertEquals; + import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import junit.framework.Assert; @@ -32,9 +36,11 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -55,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -212,6 +219,92 @@ public class TestFifoScheduler { Assert.assertEquals(3, info.getLiveContainers().size()); } + @Test(timeout=2000) + public void testUpdateResourceOnNode() throws Exception { + AsyncDispatcher dispatcher = new InlineDispatcher(); + Configuration conf = new Configuration(); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.rollMasterKey(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.rollMasterKey(); + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, containerTokenSecretManager, nmTokenSecretManager, null); + + FifoScheduler scheduler = new FifoScheduler(){ + @SuppressWarnings("unused") + public Map getNodes(){ + return nodes; + } + }; + scheduler.reinitialize(new Configuration(), rmContext); + RMNode node0 = MockNodes.newNodeInfo(1, + Resources.createResource(2048, 4), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); + scheduler.handle(nodeEvent1); + + Method method = scheduler.getClass().getDeclaredMethod("getNodes"); + @SuppressWarnings("unchecked") + Map schedulerNodes = + (Map) method.invoke(scheduler); + assertEquals(schedulerNodes.values().size(), 1); + + // set resource of RMNode to 1024 and verify it works. + node0.setResourceOption(ResourceOption.newInstance( + Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); + assertEquals(node0.getTotalCapability().getMemory(), 1024); + // verify that SchedulerNode's resource hasn't been changed. + assertEquals(schedulerNodes.get(node0.getNodeID()). + getAvailableResource().getMemory(), 2048); + // now, NM heartbeat comes. + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + scheduler.handle(node0Update); + // SchedulerNode's available resource is changed. + assertEquals(schedulerNodes.get(node0.getNodeID()). + getAvailableResource().getMemory(), 1024); + QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); + Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); + + int _appId = 1; + int _appAttemptId = 1; + ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, + _appAttemptId); + AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId, + "queue1", "user1"); + scheduler.handle(appEvent1); + + int memory = 1024; + int priority = 1; + + List ask = new ArrayList(); + ResourceRequest nodeLocal = createResourceRequest(memory, + node0.getHostName(), priority, 1); + ResourceRequest rackLocal = createResourceRequest(memory, + node0.getRackName(), priority, 1); + ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority, + 1); + ask.add(nodeLocal); + ask.add(rackLocal); + ask.add(any); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); + + // Before the node update event, there are one local request + Assert.assertEquals(1, nodeLocal.getNumContainers()); + + // Now schedule. + scheduler.handle(node0Update); + + // After the node update event, check no local request + Assert.assertEquals(0, nodeLocal.getNumContainers()); + // Also check that one container was scheduled + SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId); + Assert.assertEquals(1, info.getLiveContainers().size()); + // And check the default Queue now is full. + queueInfo = scheduler.getQueueInfo(null, false, false); + Assert.assertEquals(1.0f, queueInfo.getCurrentCapacity()); + } + // @Test public void testFifoScheduler() throws Exception {