YARN-311. RM/scheduler support for dynamic resource configuration. (Junping Du via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1539132 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Luke Lu 2013-11-05 21:20:05 +00:00
parent b58d5bad64
commit 9b1122e0eb
19 changed files with 410 additions and 29 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -48,7 +49,7 @@ public class NodeInfo {
private String nodeAddr; private String nodeAddr;
private String httpAddress; private String httpAddress;
private int cmdPort; private int cmdPort;
private Resource perNode; private volatile ResourceOption perNode;
private String rackName; private String rackName;
private String healthReport; private String healthReport;
private NodeState state; private NodeState state;
@ -56,7 +57,7 @@ public class NodeInfo {
private List<ApplicationId> toCleanUpApplications; private List<ApplicationId> toCleanUpApplications;
public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport, ResourceOption perNode, String rackName, String healthReport,
int cmdPort, String hostName, NodeState state) { int cmdPort, String hostName, NodeState state) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.nodeAddr = nodeAddr; this.nodeAddr = nodeAddr;
@ -104,6 +105,10 @@ public class NodeInfo {
} }
public Resource getTotalCapability() { public Resource getTotalCapability() {
return perNode.getResource();
}
public ResourceOption getResourceOption() {
return perNode; return perNode;
} }
@ -153,21 +158,27 @@ public class NodeInfo {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override
public void setResourceOption(ResourceOption resourceOption) {
perNode = resourceOption;
}
} }
public static RMNode newNodeInfo(String rackName, String hostName, public static RMNode newNodeInfo(String rackName, String hostName,
final Resource resource, int port) { final ResourceOption resourceOption, int port) {
final NodeId nodeId = newNodeID(hostName, port); final NodeId nodeId = newNodeID(hostName, port);
final String nodeAddr = hostName + ":" + port; final String nodeAddr = hostName + ":" + port;
final String httpAddress = hostName; final String httpAddress = hostName;
return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress, return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress,
resource, rackName, "Me good", resourceOption, rackName, "Me good",
port, hostName, null); port, hostName, null);
} }
public static RMNode newNodeInfo(String rackName, String hostName, public static RMNode newNodeInfo(String rackName, String hostName,
final Resource resource) { final Resource resource) {
return newNodeInfo(rackName, hostName, resource, NODE_ID++); return newNodeInfo(rackName, hostName, ResourceOption.newInstance(resource,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), NODE_ID++);
} }
} }

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -138,10 +139,19 @@ public class RMNodeWrapper implements RMNode {
return updates; return updates;
} }
@Override @Override
public String getNodeManagerVersion() { public String getNodeManagerVersion() {
// TODO Auto-generated method stub return node.getNodeManagerVersion();
return null; }
}
@Override
public void setResourceOption(ResourceOption resourceOption) {
node.setResourceOption(resourceOption);
}
@Override
public ResourceOption getResourceOption() {
return node.getResourceOption();
}
} }

View File

@ -19,6 +19,9 @@ Release 2.3.0 - UNRELEASED
YARN-1068. Add admin support for HA operations (Karthik Kambatla via YARN-1068. Add admin support for HA operations (Karthik Kambatla via
bikas) bikas)
YARN-311. RM/scheduler support for dynamic resource configuration.
(Junping Du via llu)
IMPROVEMENTS IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
@Public
@Evolving
public abstract class ResourceOption {
public static ResourceOption newInstance(Resource resource,
int overCommitTimeout){
ResourceOption resourceOption = Records.newRecord(ResourceOption.class);
resourceOption.setResource(resource);
return resourceOption;
}
/**
* Get the <em>resource</em> of the ResourceOption.
* @return <em>resource</em> of the ResourceOption
*/
@Private
@Evolving
public abstract Resource getResource();
@Private
@Evolving
protected abstract void setResource(Resource resource);
/**
* Get timeout for tolerant of resource over-commitment
* Note: negative value means no timeout so that allocated containers will
* keep running until the end even under resource over-commitment cases.
* @return <em>overCommitTimeout</em> of the ResourceOption
*/
@Private
@Evolving
public abstract int getOverCommitTimeout();
@Private
@Evolving
protected abstract void setOverCommitTimeout(int overCommitTimeout);
protected abstract void build();
}

View File

@ -58,6 +58,11 @@ message ResourceProto {
optional int32 virtual_cores = 2; optional int32 virtual_cores = 2;
} }
message ResourceOptionProto {
optional ResourceProto resource = 1;
optional int32 over_commit_timeout = 2;
}
message PriorityProto { message PriorityProto {
optional int32 priority = 1; optional int32 priority = 1;
} }

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto;
import com.google.common.base.Preconditions;
public class ResourceOptionPBImpl extends ResourceOption {
ResourceOptionProto proto = null;
ResourceOptionProto.Builder builder = null;
private Resource resource = null;
public ResourceOptionPBImpl() {
builder = ResourceOptionProto.newBuilder();
}
public ResourceOptionPBImpl(ResourceOptionProto proto) {
this.proto = proto;
this.resource = convertFromProtoFormat(proto.getResource());
}
public ResourceOptionProto getProto() {
return proto;
}
@Override
public Resource getResource() {
return this.resource;
}
@Override
protected void setResource(Resource resource) {
if (resource != null) {
Preconditions.checkNotNull(builder);
builder.setResource(convertToProtoFormat(resource));
}
this.resource = resource;
}
@Override
public int getOverCommitTimeout() {
Preconditions.checkNotNull(proto);
return proto.getOverCommitTimeout();
}
@Override
protected void setOverCommitTimeout(int overCommitTimeout) {
Preconditions.checkNotNull(builder);
builder.setOverCommitTimeout(overCommitTimeout);
}
private ResourceProto convertToProtoFormat(
Resource resource) {
return ((ResourcePBImpl)resource).getProto();
}
private ResourcePBImpl convertFromProtoFormat(
ResourceProto p) {
return new ResourcePBImpl(p);
}
@Override
protected void build() {
proto = builder.build();
builder = null;
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -234,7 +235,8 @@ public class ResourceTrackerService extends AbstractService implements
.getCurrentKey()); .getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion); resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
nodeManagerVersion);
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) { if (oldNode == null) {

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
/** /**
@ -35,6 +37,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
*/ */
public interface RMNode { public interface RMNode {
/** negative value means no timeout */
public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = -1;
/** /**
* the node id of of this node. * the node id of of this node.
* @return the node id of this node. * @return the node id of this node.
@ -94,7 +99,19 @@ public interface RMNode {
* the total available resource. * the total available resource.
* @return the total available resource. * @return the total available resource.
*/ */
public org.apache.hadoop.yarn.api.records.Resource getTotalCapability(); public Resource getTotalCapability();
/**
* Set resource option with total available resource and overCommitTimoutMillis
* @param resourceOption
*/
public void setResourceOption(ResourceOption resourceOption);
/**
* resource option with total available resource and overCommitTimoutMillis
* @return ResourceOption
*/
public ResourceOption getResourceOption();
/** /**
* The rack name for this node manager. * The rack name for this node manager.

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -92,7 +93,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final int httpPort; private final int httpPort;
private final String nodeAddress; // The containerManager address private final String nodeAddress; // The containerManager address
private final String httpAddress; private final String httpAddress;
private final Resource totalCapability; private volatile ResourceOption resourceOption;
private final Node node; private final Node node;
private String healthReport; private String healthReport;
@ -173,13 +174,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEvent> stateMachine; RMNodeEvent> stateMachine;
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.context = context; this.context = context;
this.hostName = hostName; this.hostName = hostName;
this.commandPort = cmPort; this.commandPort = cmPort;
this.httpPort = httpPort; this.httpPort = httpPort;
this.totalCapability = capability; this.resourceOption = resourceOption;
this.nodeAddress = hostName + ":" + cmPort; this.nodeAddress = hostName + ":" + cmPort;
this.httpAddress = hostName + ":" + httpPort; this.httpAddress = hostName + ":" + httpPort;
this.node = node; this.node = node;
@ -235,14 +236,24 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override @Override
public Resource getTotalCapability() { public Resource getTotalCapability() {
return this.totalCapability; return this.resourceOption.getResource();
}
@Override
public void setResourceOption(ResourceOption resourceOption) {
this.resourceOption = resourceOption;
}
@Override
public ResourceOption getResourceOption(){
return this.resourceOption;
} }
@Override @Override
public String getRackName() { public String getRackName() {
return node.getNetworkLocation(); return node.getNetworkLocation();
} }
@Override @Override
public Node getNode() { public Node getNode() {
return this.node; return this.node;

View File

@ -67,6 +67,12 @@ public abstract class SchedulerNode {
* @return number of active containers on the node * @return number of active containers on the node
*/ */
public abstract int getNumContainers(); public abstract int getNumContainers();
/**
* Apply delta resource on node's available resource.
* @param deltaResource the delta of resource need to apply to node
*/
public abstract void applyDeltaOnAvailableResource(Resource deltaResource);
/** /**
* Get total resources on the node. * Get total resources on the node.

View File

@ -19,20 +19,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -147,6 +146,37 @@ public class SchedulerUtils {
maximumResource, minimumResource); maximumResource, minimumResource);
ask.setCapability(normalized); ask.setCapability(normalized);
} }
/**
* Update resource in SchedulerNode if any resource change in RMNode.
* @param node SchedulerNode with old resource view
* @param rmNode RMNode with new resource view
* @param clusterResource the cluster's resource that need to update
* @param log Scheduler's log for resource change
*/
public static void updateResourceIfChanged(SchedulerNode node,
RMNode rmNode, Resource clusterResource, Log log) {
Resource oldAvailableResource = node.getAvailableResource();
Resource newAvailableResource = Resources.subtract(
rmNode.getTotalCapability(), node.getUsedResource());
if (!newAvailableResource.equals(oldAvailableResource)) {
Resource deltaResource = Resources.subtract(newAvailableResource,
oldAvailableResource);
// Reflect resource change to scheduler node.
node.applyDeltaOnAvailableResource(deltaResource);
// Reflect resource change to clusterResource.
Resources.addTo(clusterResource, deltaResource);
// TODO process resource over-commitment case (allocated containers
// > total capacity) in different option by getting value of
// overCommitTimeoutMillis.
// Log resource change
log.info("Resource change on node: " + rmNode.getNodeAddress()
+ " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
+ deltaResource.getMemory() +"MB");
}
}
/** /**
* Utility method to normalize a list of resource requests, by insuring that * Utility method to normalize a list of resource requests, by insuring that

View File

@ -629,6 +629,10 @@ public class CapacityScheduler
} }
FiCaSchedulerNode node = getNode(nm.getNodeID()); FiCaSchedulerNode node = getNode(nm.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@ -695,7 +699,7 @@ public class CapacityScheduler
node.getReservedContainer().getContainerId().getApplicationAttemptId() node.getReservedContainer().getContainerId().getApplicationAttemptId()
); );
} }
} }
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {

View File

@ -268,4 +268,10 @@ public class FiCaSchedulerNode extends SchedulerNode {
return reservedContainer; return reservedContainer;
} }
@Override
public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
} }

View File

@ -269,4 +269,11 @@ public class FSSchedulerNode extends SchedulerNode {
public synchronized AppSchedulable getReservedAppSchedulable() { public synchronized AppSchedulable getReservedAppSchedulable() {
return reservedAppSchedulable; return reservedAppSchedulable;
} }
@Override
public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
} }

View File

@ -920,6 +920,9 @@ public class FairScheduler implements ResourceScheduler {
eventLog.log("HEARTBEAT", nm.getHostName()); eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID()); FSSchedulerNode node = nodes.get(nm.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();

View File

@ -100,7 +100,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
private RMContext rmContext; private RMContext rmContext;
private Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>(); protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized; private boolean initialized;
private Resource minimumAllocation; private Resource minimumAllocation;
@ -628,6 +628,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private synchronized void nodeUpdate(RMNode rmNode) { private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID()); FiCaSchedulerNode node = getNode(rmNode.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates(); List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@ -661,7 +664,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
metrics.setAvailableResourcesToQueue( metrics.setAvailableResourcesToQueue(
Resources.subtract(clusterResource, usedResource)); Resources.subtract(clusterResource, usedResource));
} }
@Override @Override
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -93,14 +94,14 @@ public class MockNodes {
private String nodeAddr; private String nodeAddr;
private String httpAddress; private String httpAddress;
private int cmdPort; private int cmdPort;
private Resource perNode; private ResourceOption perNode;
private String rackName; private String rackName;
private String healthReport; private String healthReport;
private long lastHealthReportTime; private long lastHealthReportTime;
private NodeState state; private NodeState state;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport, ResourceOption perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName, NodeState state) { long lastHealthReportTime, int cmdPort, String hostName, NodeState state) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.nodeAddr = nodeAddr; this.nodeAddr = nodeAddr;
@ -146,7 +147,7 @@ public class MockNodes {
@Override @Override
public Resource getTotalCapability() { public Resource getTotalCapability() {
return this.perNode; return this.perNode.getResource();
} }
@Override @Override
@ -202,6 +203,17 @@ public class MockNodes {
public long getLastHealthReportTime() { public long getLastHealthReportTime() {
return lastHealthReportTime; return lastHealthReportTime;
} }
@Override
public void setResourceOption(ResourceOption resourceOption) {
this.perNode = resourceOption;
}
@Override
public ResourceOption getResourceOption(){
return this.perNode;
}
}; };
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) { private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
@ -220,8 +232,9 @@ public class MockNodes {
final String httpAddress = httpAddr; final String httpAddress = httpAddr;
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress,
healthReport, 0, nid, hostName, state); ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
rackName, healthReport, 0, nid, hostName, state);
} }
public static RMNode nodeInfo(int rack, final Resource perNode, public static RMNode nodeInfo(int rack, final Resource perNode,

View File

@ -36,10 +36,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@ -451,7 +453,8 @@ public class TestRMNodeTransitions {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
Resource capability = Resource.newInstance(4096, 4); Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, capability, null); null, ResourceOption.newInstance(capability,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null);
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
Assert.assertEquals(NodeState.RUNNING, node.getState()); Assert.assertEquals(NodeState.RUNNING, node.getState());
return node; return node;

View File

@ -18,10 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import junit.framework.Assert; import junit.framework.Assert;
@ -32,9 +36,11 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -55,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -212,6 +219,92 @@ public class TestFifoScheduler {
Assert.assertEquals(3, info.getLiveContainers().size()); Assert.assertEquals(3, info.getLiveContainers().size());
} }
@Test(timeout=2000)
public void testUpdateResourceOnNode() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
Configuration conf = new Configuration();
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null);
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")
public Map<NodeId, FiCaSchedulerNode> getNodes(){
return nodes;
}
};
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
Resources.createResource(2048, 4), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
scheduler.handle(nodeEvent1);
Method method = scheduler.getClass().getDeclaredMethod("getNodes");
@SuppressWarnings("unchecked")
Map<NodeId, FiCaSchedulerNode> schedulerNodes =
(Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
assertEquals(schedulerNodes.values().size(), 1);
// set resource of RMNode to 1024 and verify it works.
node0.setResourceOption(ResourceOption.newInstance(
Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
assertEquals(node0.getTotalCapability().getMemory(), 1024);
// verify that SchedulerNode's resource hasn't been changed.
assertEquals(schedulerNodes.get(node0.getNodeID()).
getAvailableResource().getMemory(), 2048);
// now, NM heartbeat comes.
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
scheduler.handle(node0Update);
// SchedulerNode's available resource is changed.
assertEquals(schedulerNodes.get(node0.getNodeID()).
getAvailableResource().getMemory(), 1024);
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity());
int _appId = 1;
int _appAttemptId = 1;
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
_appAttemptId);
AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId,
"queue1", "user1");
scheduler.handle(appEvent1);
int memory = 1024;
int priority = 1;
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest nodeLocal = createResourceRequest(memory,
node0.getHostName(), priority, 1);
ResourceRequest rackLocal = createResourceRequest(memory,
node0.getRackName(), priority, 1);
ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
1);
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
// Before the node update event, there are one local request
Assert.assertEquals(1, nodeLocal.getNumContainers());
// Now schedule.
scheduler.handle(node0Update);
// After the node update event, check no local request
Assert.assertEquals(0, nodeLocal.getNumContainers());
// Also check that one container was scheduled
SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
Assert.assertEquals(1, info.getLiveContainers().size());
// And check the default Queue now is full.
queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(1.0f, queueInfo.getCurrentCapacity());
}
// @Test // @Test
public void testFifoScheduler() throws Exception { public void testFifoScheduler() throws Exception {