diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 1d573822d9b..029fa877f0b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -55,7 +54,7 @@ public class NodeInfo { private String nodeAddr; private String httpAddress; private int cmdPort; - private volatile ResourceOption perNode; + private volatile Resource perNode; private String rackName; private String healthReport; private NodeState state; @@ -63,7 +62,7 @@ public class NodeInfo { private List toCleanUpApplications; public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, - ResourceOption perNode, String rackName, String healthReport, + Resource perNode, String rackName, String healthReport, int cmdPort, String hostName, NodeState state) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; @@ -111,10 +110,6 @@ public class NodeInfo { } public Resource getTotalCapability() { - return perNode.getResource(); - } - - public ResourceOption getResourceOption() { return perNode; } @@ -159,32 +154,26 @@ public class NodeInfo { return list; } - @Override - public String getNodeManagerVersion() { - // TODO Auto-generated method stub - return null; - } - @Override - public void setResourceOption(ResourceOption resourceOption) { - perNode = resourceOption; + public String getNodeManagerVersion() { + return null; } + } public static RMNode newNodeInfo(String rackName, String hostName, - final ResourceOption resourceOption, int port) { + final Resource resource, int port) { final NodeId nodeId = newNodeID(hostName, port); final String nodeAddr = hostName + ":" + port; final String httpAddress = hostName; return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress, - resourceOption, rackName, "Me good", + resource, rackName, "Me good", port, hostName, null); } public static RMNode newNodeInfo(String rackName, String hostName, final Resource resource) { - return newNodeInfo(rackName, hostName, ResourceOption.newInstance(resource, - RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), NODE_ID++); + return newNodeInfo(rackName, hostName, resource, NODE_ID++); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index da9b56fd546..7eca66fb779 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -148,14 +147,4 @@ public class RMNodeWrapper implements RMNode { return node.getNodeManagerVersion(); } - @Override - public void setResourceOption(ResourceOption resourceOption) { - node.setResourceOption(resourceOption); - } - - @Override - public ResourceOption getResourceOption() { - return node.getResourceOption(); - } - } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cada9bc8a5f..4b56d289732 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -143,6 +143,9 @@ Release 2.6.0 - UNRELEASED YARN-2406. Move RM recovery related proto to yarn_server_resourcemanager_recovery.proto. (Tsuyoshi Ozawa via jianhe) + YARN-1506. Changed RMNode/SchedulerNode to update resource with event + notification. (Junping Du via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java index 380f38d74a1..1ca90ccedf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java @@ -62,6 +62,8 @@ public abstract class ResourceOption { @Evolving protected abstract void setOverCommitTimeout(int overCommitTimeout); + @Private + @Evolving protected abstract void build(); @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceResponse.java index 5155101d244..8603ea31c9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceResponse.java @@ -17,9 +17,10 @@ */ package org.apache.hadoop.yarn.server.api.protocolrecords; -import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.util.Records; /** *

The response sent by the ResourceManager to Admin client on @@ -30,8 +31,13 @@ import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; * @see ResourceManagerAdministrationProtocol#updateNodeResource( * UpdateNodeResourceRequest) */ -@Public +@Private @Evolving -public interface UpdateNodeResourceResponse { +public abstract class UpdateNodeResourceResponse { + public static UpdateNodeResourceResponse newInstance(){ + UpdateNodeResourceResponse response = + Records.newRecord(UpdateNodeResourceResponse.class); + return response; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java index 79f479ee99d..5a4a44e648a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java @@ -22,14 +22,15 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProtoOrBuilder; import com.google.common.base.Preconditions; public class ResourceOptionPBImpl extends ResourceOption { - ResourceOptionProto proto = null; + ResourceOptionProto proto = ResourceOptionProto.getDefaultInstance(); ResourceOptionProto.Builder builder = null; - private Resource resource = null; + boolean viaProto = false; public ResourceOptionPBImpl() { builder = ResourceOptionProto.newBuilder(); @@ -37,39 +38,46 @@ public class ResourceOptionPBImpl extends ResourceOption { public ResourceOptionPBImpl(ResourceOptionProto proto) { this.proto = proto; - this.resource = convertFromProtoFormat(proto.getResource()); + viaProto = true; } public ResourceOptionProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; return proto; } @Override public Resource getResource() { - return this.resource; + ResourceOptionProtoOrBuilder p = viaProto ? proto : builder; + return convertFromProtoFormat(p.getResource()); } @Override protected void setResource(Resource resource) { - if (resource != null) { - Preconditions.checkNotNull(builder); - builder.setResource(convertToProtoFormat(resource)); - } - this.resource = resource; + maybeInitBuilder(); + builder.setResource(convertToProtoFormat(resource)); } @Override public int getOverCommitTimeout() { - Preconditions.checkNotNull(proto); - return proto.getOverCommitTimeout(); + ResourceOptionProtoOrBuilder p = viaProto ? proto : builder; + return p.getOverCommitTimeout(); } @Override protected void setOverCommitTimeout(int overCommitTimeout) { - Preconditions.checkNotNull(builder); + maybeInitBuilder(); builder.setOverCommitTimeout(overCommitTimeout); } + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ResourceOptionProto.newBuilder(proto); + } + viaProto = false; + } + private ResourceProto convertToProtoFormat( Resource resource) { return ((ResourcePBImpl)resource).getProto(); @@ -83,6 +91,7 @@ public class ResourceOptionPBImpl extends ResourceOption { @Override protected void build() { proto = builder.build(); + viaProto = true; builder = null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceResponsePBImpl.java index f314f861b65..3e2aca559ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceResponsePBImpl.java @@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; -public class UpdateNodeResourceResponsePBImpl implements UpdateNodeResourceResponse { +public class UpdateNodeResourceResponsePBImpl extends UpdateNodeResourceResponse { UpdateNodeResourceResponseProto proto = UpdateNodeResourceResponseProto.getDefaultInstance(); UpdateNodeResourceResponseProto.Builder builder = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index c47f49e207e..ff0a249bce9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import com.google.common.annotations.VisibleForTesting; @@ -513,9 +514,20 @@ public class AdminService extends CompositeService implements return UserGroupInformation.createRemoteUser(user).getGroupNames(); } + @SuppressWarnings("unchecked") @Override public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException { + String argName = "updateNodeResource"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not update node resource."); + throwStandbyException(); + } + Map nodeResourceMap = request.getNodeResourceMap(); Set nodeIds = nodeResourceMap.keySet(); // verify nodes are all valid first. @@ -536,21 +548,31 @@ public class AdminService extends CompositeService implements // Notice: it is still possible to have invalid NodeIDs as nodes decommission // may happen just at the same time. This time, only log and skip absent // nodes without throwing any exceptions. + boolean allSuccess = true; for (Map.Entry entry : nodeResourceMap.entrySet()) { ResourceOption newResourceOption = entry.getValue(); NodeId nodeId = entry.getKey(); RMNode node = this.rmContext.getRMNodes().get(nodeId); + if (node == null) { LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); + allSuccess = false; } else { - node.setResourceOption(newResourceOption); - LOG.info("Update resource successfully on node(" + node.getNodeID() - +") with resource(" + newResourceOption.toString() + ")"); + // update resource to RMNode + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); + LOG.info("Update resource on node(" + node.getNodeID() + + ") with resource(" + newResourceOption.toString() + ")"); + } } - UpdateNodeResourceResponse response = recordFactory.newRecordInstance( - UpdateNodeResourceResponse.class); - return response; + if (allSuccess) { + RMAuditLogger.logSuccess(user.getShortUserName(), argName, + "AdminService"); + } + UpdateNodeResourceResponse response = + UpdateNodeResourceResponse.newInstance(); + return response; } private synchronized Configuration getConfiguration(Configuration conf, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b532dd56309..4798120c0da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -299,8 +298,7 @@ public class ResourceTrackerService extends AbstractService implements .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), - nodeManagerVersion); + resolve(host), capability, nodeManagerVersion); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 24793e86f17..a423ea50675 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -101,18 +101,6 @@ public interface RMNode { */ public Resource getTotalCapability(); - /** - * Set resource option with total available resource and overCommitTimoutMillis - * @param resourceOption - */ - public void setResourceOption(ResourceOption resourceOption); - - /** - * resource option with total available resource and overCommitTimoutMillis - * @return ResourceOption - */ - public ResourceOption getResourceOption(); - /** * The rack name for this node manager. * @return the rack name. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index ef644be7000..c0096b9b90d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -24,6 +24,9 @@ public enum RMNodeEventType { // Source: AdminService DECOMMISSION, + + // Source: AdminService, ResourceTrackerService + RESOURCE_UPDATE, // ResourceTrackerService STATUS_UPDATE, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9ead898db40..3ce641662cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils.ContainerIdComparator; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -96,7 +97,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private int httpPort; private final String nodeAddress; // The containerManager address private String httpAddress; - private volatile ResourceOption resourceOption; + private volatile Resource totalCapability; private final Node node; private String healthReport; @@ -129,6 +130,9 @@ public class RMNodeImpl implements RMNode, EventHandler { //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) + .addTransition(NodeState.NEW, NodeState.NEW, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) //Transitions from RUNNING state .addTransition(NodeState.RUNNING, @@ -149,6 +153,23 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) + + //Transitions from REBOOTED state + .addTransition(NodeState.REBOOTED, NodeState.REBOOTED, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) + + //Transitions from DECOMMISSIONED state + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) + + //Transitions from LOST state + .addTransition(NodeState.LOST, NodeState.LOST, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) //Transitions from UNHEALTHY state .addTransition(NodeState.UNHEALTHY, @@ -169,6 +190,8 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) // create the topology tables .installTopology(); @@ -177,13 +200,13 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEvent> stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, - int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) { + int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; this.commandPort = cmPort; this.httpPort = httpPort; - this.resourceOption = resourceOption; + this.totalCapability = capability; this.nodeAddress = hostName + ":" + cmPort; this.httpAddress = hostName + ":" + httpPort; this.node = node; @@ -239,17 +262,7 @@ public class RMNodeImpl implements RMNode, EventHandler { @Override public Resource getTotalCapability() { - return this.resourceOption.getResource(); - } - - @Override - public void setResourceOption(ResourceOption resourceOption) { - this.resourceOption = resourceOption; - } - - @Override - public ResourceOption getResourceOption(){ - return this.resourceOption; + return this.totalCapability; } @Override @@ -473,6 +486,13 @@ public class RMNodeImpl implements RMNode, EventHandler { context.getDispatcher().getEventHandler() .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); } + + private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, + RMNodeResourceUpdateEvent event){ + ResourceOption resourceOption = event.getResourceOption(); + // Set resource on RMNode + rmNode.totalCapability = resourceOption.getResource(); + } public static class AddNodeTransition implements SingleArcTransition { @@ -526,8 +546,8 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); rmNode.httpPort = newNode.getHttpPort(); rmNode.httpAddress = newNode.getHttpAddress(); - rmNode.resourceOption = newNode.getResourceOption(); - + rmNode.totalCapability = newNode.getTotalCapability(); + // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); @@ -540,9 +560,43 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); + if (rmNode.getState().equals(NodeState.RUNNING)) { + // Update scheduler node's capacity for reconnect node. + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeResourceUpdateSchedulerEvent(rmNode, + ResourceOption.newInstance(rmNode.totalCapability, -1))); + } + } } + + public static class UpdateNodeResourceWhenRunningTransition + implements SingleArcTransition { + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + RMNodeResourceUpdateEvent updateEvent = (RMNodeResourceUpdateEvent)event; + updateNodeResourceFromEvent(rmNode, updateEvent); + // Notify new resourceOption to scheduler + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeResourceUpdateSchedulerEvent(rmNode, updateEvent.getResourceOption())); + } + } + + public static class UpdateNodeResourceWhenUnusableTransition + implements SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + // The node is not usable, only log a warn message + LOG.warn("Try to update resource on a "+ rmNode.getState().toString() + + " node: "+rmNode.toString()); + updateNodeResourceFromEvent(rmNode, (RMNodeResourceUpdateEvent)event); + // No need to notify scheduler as schedulerNode is not function now + // and can sync later from RMnode. + } + } + public static class CleanUpAppTransition implements SingleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java new file mode 100644 index 00000000000..bf1f148b125 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceOption; + +public class RMNodeResourceUpdateEvent extends RMNodeEvent { + + private final ResourceOption resourceOption; + + public RMNodeResourceUpdateEvent(NodeId nodeId, ResourceOption resourceOption) { + super(nodeId, RMNodeEventType.RESOURCE_UPDATE); + this.resourceOption = resourceOption; + } + + public ResourceOption getResourceOption() { + return resourceOption; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ab56bb97212..ee5dcbe7ece 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -473,4 +474,32 @@ public abstract class AbstractYarnScheduler .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); } } + + /** + * Process resource update on a node. + */ + public synchronized void updateNodeResource(RMNode nm, + ResourceOption resourceOption) { + + SchedulerNode node = getSchedulerNode(nm.getNodeID()); + Resource newResource = resourceOption.getResource(); + Resource oldResource = node.getTotalResource(); + if(!oldResource.equals(newResource)) { + // Log resource change + LOG.info("Update resource on node: " + node.getNodeName() + + " from: " + oldResource + ", to: " + + newResource); + + // update resource to node + node.setTotalResource(newResource); + + // update resource to clusterResource + Resources.subtractFrom(clusterResource, oldResource); + Resources.addTo(clusterResource, newResource); + } else { + // Log resource change + LOG.warn("Update resource on node: " + node.getNodeName() + + " with the same resource: " + newResource); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 7074059ecf4..f4d8731a012 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -77,6 +77,16 @@ public abstract class SchedulerNode { return this.rmNode; } + /** + * Set total resources on the node. + * @param resource total resources on the node. + */ + public synchronized void setTotalResource(Resource resource){ + this.totalResourceCapability = resource; + this.availableResource = Resources.subtract(totalResourceCapability, + this.usedResource); + } + /** * Get the ID of the node which contains both its hostname and port. * @@ -158,7 +168,7 @@ public abstract class SchedulerNode { * * @return total resources on the node. */ - public Resource getTotalResource() { + public synchronized Resource getTotalResource() { return this.totalResourceCapability; } @@ -259,19 +269,6 @@ public abstract class SchedulerNode { this.reservedContainer = reservedContainer; } - /** - * Apply delta resource on node's available resource. - * - * @param deltaResource - * the delta of resource need to apply to node - */ - public synchronized void - applyDeltaOnAvailableResource(Resource deltaResource) { - // we can only adjust available resource if total resource is changed. - Resources.addTo(this.availableResource, deltaResource); - } - - public synchronized void recoverContainer(RMContainer rmContainer) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index d3df93fcc6e..ac37c2f0bc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.List; -import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -147,42 +146,6 @@ public class SchedulerUtils { ask.setCapability(normalized); } - /** - * Update resource in SchedulerNode if any resource change in RMNode. - * @param node SchedulerNode with old resource view - * @param rmNode RMNode with new resource view - * @param clusterResource the cluster's resource that need to update - * @param log Scheduler's log for resource change - * @return true if the resources have changed - */ - public static boolean updateResourceIfChanged(SchedulerNode node, - RMNode rmNode, Resource clusterResource, Log log) { - boolean result = false; - Resource oldAvailableResource = node.getAvailableResource(); - Resource newAvailableResource = Resources.subtract( - rmNode.getTotalCapability(), node.getUsedResource()); - - if (!newAvailableResource.equals(oldAvailableResource)) { - result = true; - Resource deltaResource = Resources.subtract(newAvailableResource, - oldAvailableResource); - // Reflect resource change to scheduler node. - node.applyDeltaOnAvailableResource(deltaResource); - // Reflect resource change to clusterResource. - Resources.addTo(clusterResource, deltaResource); - // TODO process resource over-commitment case (allocated containers - // > total capacity) in different option by getting value of - // overCommitTimeoutMillis. - - // Log resource change - log.info("Resource change on node: " + rmNode.getNodeAddress() - + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: " - + deltaResource.getMemory() +"MB"); - } - - return result; - } - /** * Utility method to normalize a list of resource requests, by insuring that * the memory for each request is a multiple of minMemory and is not zero. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c8a73bfb530..a8ef94224b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -50,6 +50,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -82,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -866,12 +869,6 @@ public class CapacityScheduler extends FiCaSchedulerNode node = getNode(nm.getNodeID()); - // Update resource if any change - if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, - LOG)) { - root.updateClusterResource(clusterResource); - } - List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -899,6 +896,15 @@ public class CapacityScheduler extends + " availableResource: " + node.getAvailableResource()); } } + + /** + * Process resource update on a node. + */ + private synchronized void updateNodeAndQueueResource(RMNode nm, + ResourceOption resourceOption) { + updateNodeResource(nm, resourceOption); + root.updateClusterResource(clusterResource); + } private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { @@ -969,6 +975,14 @@ public class CapacityScheduler extends removeNode(nodeRemovedEvent.getRemovedRMNode()); } break; + case NODE_RESOURCE_UPDATE: + { + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java new file mode 100644 index 00000000000..df32b283f6a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +public class NodeResourceUpdateSchedulerEvent extends SchedulerEvent { + + private final RMNode rmNode; + private final ResourceOption resourceOption; + + public NodeResourceUpdateSchedulerEvent(RMNode rmNode, + ResourceOption resourceOption) { + super(SchedulerEventType.NODE_RESOURCE_UPDATE); + this.rmNode = rmNode; + this.resourceOption = resourceOption; + } + + public RMNode getRMNode() { + return rmNode; + } + + public ResourceOption getResourceOption() { + return resourceOption; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 243c72ba676..062f831c4ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -24,6 +24,7 @@ public enum SchedulerEventType { NODE_ADDED, NODE_REMOVED, NODE_UPDATE, + NODE_RESOURCE_UPDATE, // Source: RMApp APP_ADDED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 40c72a621e7..6932a315959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -956,7 +958,7 @@ public class FairScheduler extends allocation.getNMTokenList()); } } - + /** * Process a heartbeat update from a node. */ @@ -967,9 +969,6 @@ public class FairScheduler extends } eventLog.log("HEARTBEAT", nm.getHostName()); FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); - - // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); @@ -1173,6 +1172,15 @@ public class FairScheduler extends removeApplication(appRemovedEvent.getApplicationID(), appRemovedEvent.getFinalState()); break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; case APP_ATTEMPT_ADDED: if (!(event instanceof AppAttemptAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); @@ -1534,4 +1542,16 @@ public class FairScheduler extends } return queue1; // names are identical } + + /** + * Process resource update on a node and update Queue. + */ + @Override + public synchronized void updateNodeResource(RMNode nm, + ResourceOption resourceOption) { + super.updateNodeResource(nm, resourceOption); + updateRootQueueMetrics(); + queueMgr.getRootQueue().setSteadyFairShare(clusterResource); + queueMgr.getRootQueue().recomputeSteadyShares(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index dd2ea433ebb..d72e7966064 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -681,9 +682,6 @@ public class FifoScheduler extends private synchronized void nodeUpdate(RMNode rmNode) { FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG); - List containerInfoList = rmNode.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -750,6 +748,14 @@ public class FifoScheduler extends removeNode(nodeRemovedEvent.getRemovedRMNode()); } break; + case NODE_RESOURCE_UPDATE: + { + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 8ef01d998d7..79f909806ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -94,14 +93,14 @@ public class MockNodes { private String nodeAddr; private String httpAddress; private int cmdPort; - private ResourceOption perNode; + private Resource perNode; private String rackName; private String healthReport; private long lastHealthReportTime; private NodeState state; public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, - ResourceOption perNode, String rackName, String healthReport, + Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; @@ -147,7 +146,7 @@ public class MockNodes { @Override public Resource getTotalCapability() { - return this.perNode.getResource(); + return this.perNode; } @Override @@ -203,16 +202,6 @@ public class MockNodes { public long getLastHealthReportTime() { return lastHealthReportTime; } - - @Override - public void setResourceOption(ResourceOption resourceOption) { - this.perNode = resourceOption; - } - - @Override - public ResourceOption getResourceOption(){ - return this.perNode; - } }; @@ -232,9 +221,8 @@ public class MockNodes { final String httpAddress = httpAddr; String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; - return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, - ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), - rackName, healthReport, 0, nid, hostName, state); + return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, + rackName, healthReport, 0, nid, hostName, state); } public static RMNode nodeInfo(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a7f624029b2..3817637676b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -457,7 +457,6 @@ public class MockRM extends ResourceManager { @Override protected ResourceTrackerService createResourceTrackerService() { - Configuration conf = new Configuration(); RMContainerTokenSecretManager containerTokenSecretManager = getRMContext().getContainerTokenSecretManager(); @@ -547,6 +546,10 @@ public class MockRM extends ResourceManager { public RMAppManager getRMAppManager() { return this.rmAppManager; } + + public AdminService getAdminService() { + return this.adminService; + } @Override protected void startWepApp() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 420fc942ae2..12f7498b7b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -23,7 +23,9 @@ import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,9 +37,13 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -509,6 +515,85 @@ public class TestFifoScheduler { rm.stop(); } + @Test + public void testResourceOverCommit() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); + + RMApp app1 = rm.submitApp(2048); + // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( + nm1.getNodeId()); + // check node report, 2 GB used and 2 GB available + Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory()); + + // add request for containers + am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler, 2 GB given to AM1, resource remaining 0 + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(1000); + alloc1Response = am1.schedule(); + } + + List allocated1 = alloc1Response.getAllocatedContainers(); + Assert.assertEquals(1, allocated1.size()); + Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); + Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); + + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + // check node report, 4 GB used and 0 GB available + Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory()); + Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); + + // check container is assigned with 2 GB. + Container c1 = allocated1.get(0); + Assert.assertEquals(2 * GB, c1.getResource().getMemory()); + + // update node resource to 2 GB, so resource is over-consumed. + Map nodeResourceMap = + new HashMap(); + nodeResourceMap.put(nm1.getNodeId(), + ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); + UpdateNodeResourceRequest request = + UpdateNodeResourceRequest.newInstance(nodeResourceMap); + AdminService as = rm.adminService; + as.updateNodeResource(request); + + // Now, the used resource is still 4 GB, and available resource is minus value. + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory()); + + // Check container can complete successfully in case of resource over-commitment. + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + c1.getId(), ContainerState.COMPLETE, "", 0); + nm1.containerStatus(containerStatus); + int waitCount = 0; + while (attempt1.getJustFinishedContainers().size() < 1 + && waitCount++ != 20) { + LOG.info("Waiting for containers to be finished for app 1... Tried " + + waitCount + " times already.."); + Thread.sleep(100); + } + Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); + Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); + // As container return 2 GB back, the available resource becomes 0 again. + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + rm.stop(); + } public static void main(String[] args) throws Exception { TestFifoScheduler t = new TestFifoScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index aa2cfc2eba1..d877e25c2d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -463,8 +465,7 @@ public class TestRMNodeTransitions { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, - null, ResourceOption.newInstance(capability, - RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); + null, capability, nmVersion); node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; @@ -486,6 +487,25 @@ public class TestRMNodeTransitions { RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); return node; } + + private RMNodeImpl getNewNode(Resource capability) { + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, + capability, null); + return node; + } + + private RMNodeImpl getRebootedNode() { + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + Resource capability = Resource.newInstance(4096, 4); + RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, + null, capability, null); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING)); + Assert.assertEquals(NodeState.REBOOTED, node.getState()); + return node; + } @Test public void testAdd() { @@ -534,6 +554,57 @@ public class TestRMNodeTransitions { Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, nodesListManagerEvent.getType()); } + + @Test + public void testResourceUpdateOnRunningNode() { + RMNodeImpl node = getRunningNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), + ResourceOption.newInstance(Resource.newInstance(2048, 2), + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + Resource newCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); + assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); + + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + + @Test + public void testResourceUpdateOnNewNode() { + RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4)); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), + ResourceOption.newInstance(Resource.newInstance(2048, 2), + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + Resource newCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); + assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); + + Assert.assertEquals(NodeState.NEW, node.getState()); + } + + @Test + public void testResourceUpdateOnRebootedNode() { + RMNodeImpl node = getRebootedNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), + ResourceOption.newInstance(Resource.newInstance(2048, 2), + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + Resource newCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); + assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); + + Assert.assertEquals(NodeState.REBOOTED, node.getState()); + } @Test public void testReconnnectUpdate() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index cced69aea36..d16d5510365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import java.util.ArrayList; +import java.util.List; + import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; @@ -47,14 +50,14 @@ public class TestNMReconnect { private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private RMNodeEvent rmNodeEvent = null; + private List rmNodeEvents = new ArrayList(); private class TestRMNodeEventDispatcher implements EventHandler { @Override public void handle(RMNodeEvent event) { - rmNodeEvent = event; + rmNodeEvents.add(event); } } @@ -109,16 +112,18 @@ public class TestNMReconnect { request1.setResource(capability); resourceTrackerService.registerNodeManager(request1); - Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvent.getType()); + Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvents.get(0).getType()); - rmNodeEvent = null; + rmNodeEvents.clear(); resourceTrackerService.registerNodeManager(request1); - Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType()); + Assert.assertEquals(RMNodeEventType.RECONNECTED, + rmNodeEvents.get(0).getType()); - rmNodeEvent = null; + rmNodeEvents.clear(); resourceTrackerService.registerNodeManager(request1); capability = BuilderUtils.newResource(1024, 2); request1.setResource(capability); - Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType()); + Assert.assertEquals(RMNodeEventType.RECONNECTED, + rmNodeEvents.get(0).getType()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index f64bd62e078..e029749f1db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -47,23 +47,30 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -90,6 +97,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -591,7 +599,6 @@ public class TestCapacityScheduler { return result; } - @SuppressWarnings("resource") @Test public void testBlackListNodes() throws Exception { Configuration conf = new Configuration(); @@ -627,6 +634,104 @@ public class TestCapacityScheduler { Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); rm.stop(); } + + @Test + public void testResourceOverCommit() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); + RMApp app1 = rm.submitApp(2048); + // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( + nm1.getNodeId()); + // check node report, 2 GB used and 2 GB available + Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory()); + + // add request for containers + am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler, 2 GB given to AM1, resource remaining 0 + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + alloc1Response = am1.schedule(); + } + + List allocated1 = alloc1Response.getAllocatedContainers(); + Assert.assertEquals(1, allocated1.size()); + Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); + Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); + + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + // check node report, 4 GB used and 0 GB available + Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory()); + Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); + + // check container is assigned with 2 GB. + Container c1 = allocated1.get(0); + Assert.assertEquals(2 * GB, c1.getResource().getMemory()); + + // update node resource to 2 GB, so resource is over-consumed. + Map nodeResourceMap = + new HashMap(); + nodeResourceMap.put(nm1.getNodeId(), + ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); + UpdateNodeResourceRequest request = + UpdateNodeResourceRequest.newInstance(nodeResourceMap); + AdminService as = ((MockRM)rm).getAdminService(); + as.updateNodeResource(request); + + // Now, the used resource is still 4 GB, and available resource is minus value. + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory()); + + // Check container can complete successfully in case of resource over-commitment. + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + c1.getId(), ContainerState.COMPLETE, "", 0); + nm1.containerStatus(containerStatus); + int waitCount = 0; + while (attempt1.getJustFinishedContainers().size() < 1 + && waitCount++ != 20) { + LOG.info("Waiting for containers to be finished for app 1... Tried " + + waitCount + " times already.."); + Thread.sleep(100); + } + Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); + Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); + // As container return 2 GB back, the available resource becomes 0 again. + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + // Verify no NPE is trigger in schedule after resource is updated. + am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1); + alloc1Response = am1.schedule(); + Assert.assertEquals("Shouldn't have enough resource to allocate containers", + 0, alloc1Response.getAllocatedContainers().size()); + int times = 0; + // try 10 times as scheduling is async process. + while (alloc1Response.getAllocatedContainers().size() < 1 + && times++ < 10) { + LOG.info("Waiting for containers to be allocated for app 1... Tried " + + times + " times already.."); + Thread.sleep(100); + } + Assert.assertEquals("Shouldn't have enough resource to allocate containers", + 0, alloc1Response.getAllocatedContainers().size()); + rm.stop(); + } @Test (timeout = 5000) public void testApplicationComparator() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index a0e22799290..3d383647ba4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -278,17 +279,16 @@ public class TestFifoScheduler { (Map) method.invoke(scheduler); assertEquals(schedulerNodes.values().size(), 1); - // set resource of RMNode to 1024 and verify it works. - node0.setResourceOption(ResourceOption.newInstance( - Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); - assertEquals(node0.getTotalCapability().getMemory(), 1024); - // verify that SchedulerNode's resource hasn't been changed. - assertEquals(schedulerNodes.get(node0.getNodeID()). - getAvailableResource().getMemory(), 2048); - // now, NM heartbeat comes. - NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); - scheduler.handle(node0Update); - // SchedulerNode's available resource is changed. + Resource newResource = Resources.createResource(1024, 4); + + NodeResourceUpdateSchedulerEvent node0ResourceUpdate = new + NodeResourceUpdateSchedulerEvent(node0, ResourceOption.newInstance( + newResource, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); + scheduler.handle(node0ResourceUpdate); + + // SchedulerNode's total resource and available resource are changed. + assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource() + .getMemory(), 1024); assertEquals(schedulerNodes.get(node0.getNodeID()). getAvailableResource().getMemory(), 1024); QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); @@ -324,6 +324,7 @@ public class TestFifoScheduler { // Before the node update event, there are one local request Assert.assertEquals(1, nodeLocal.getNumContainers()); + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); // Now schedule. scheduler.handle(node0Update); @@ -544,7 +545,6 @@ public class TestFifoScheduler { LOG.info("--- END: testFifoScheduler ---"); } - @SuppressWarnings("resource") @Test public void testBlackListNodes() throws Exception { Configuration conf = new Configuration();