YARN-576. Modified ResourceManager to reject NodeManagers that don't satisy minimum resource requirements. Contributed by Kenji Kikushima.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1476824 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-28 20:03:37 +00:00
parent 3e00835dad
commit d9ce42479e
6 changed files with 76 additions and 7 deletions

View File

@ -312,6 +312,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-289. Fair scheduler allows reservations that won't fit on node. YARN-289. Fair scheduler allows reservations that won't fit on node.
(Sandy Ryza via tomwhite) (Sandy Ryza via tomwhite)
YARN-576. Modified ResourceManager to reject NodeManagers that don't satisy
minimum resource requirements. (Kenji Kikushima via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -78,6 +78,9 @@ public class ResourceTrackerService extends AbstractService implements
private static final NodeHeartbeatResponse shutDown = recordFactory private static final NodeHeartbeatResponse shutDown = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
private int minAllocMb;
private int minAllocVcores;
static { static {
resync.setNodeAction(NodeAction.RESYNC); resync.setNodeAction(NodeAction.RESYNC);
@ -111,6 +114,14 @@ public class ResourceTrackerService extends AbstractService implements
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
+ " should be larger than 0."); + " should be larger than 0.");
} }
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
minAllocVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
super.init(conf); super.init(conf);
} }
@ -169,6 +180,16 @@ public class ResourceTrackerService extends AbstractService implements
return response; return response;
} }
// Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {
LOG.info("NodeManager from " + host
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
+ " signal to the NodeManager.");
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
if (isSecurityEnabled()) { if (isSecurityEnabled()) {
MasterKey nextMasterKeyForNode = MasterKey nextMasterKeyForNode =
this.containerTokenSecretManager.getCurrentKey(); this.containerTokenSecretManager.getCurrentKey();

View File

@ -45,6 +45,7 @@ public class MockNM {
private int responseId; private int responseId;
private NodeId nodeId; private NodeId nodeId;
private final int memory; private final int memory;
private final int vCores = 1;
private ResourceTrackerService resourceTracker; private ResourceTrackerService resourceTracker;
private final int httpPort = 2; private final int httpPort = 2;
private MasterKey currentMasterKey; private MasterKey currentMasterKey;
@ -53,9 +54,7 @@ public class MockNM {
this.memory = memory; this.memory = memory;
this.resourceTracker = resourceTracker; this.resourceTracker = resourceTracker;
String[] splits = nodeIdStr.split(":"); String[] splits = nodeIdStr.split(":");
nodeId = Records.newRecord(NodeId.class); nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1]));
nodeId.setHost(splits[0]);
nodeId.setPort(Integer.parseInt(splits[1]));
} }
public NodeId getNodeId() { public NodeId getNodeId() {
@ -83,8 +82,7 @@ public class MockNM {
RegisterNodeManagerRequest.class); RegisterNodeManagerRequest.class);
req.setNodeId(nodeId); req.setNodeId(nodeId);
req.setHttpPort(httpPort); req.setHttpPort(httpPort);
Resource resource = Records.newRecord(Resource.class); Resource resource = BuilderUtils.newResource(memory, vCores);
resource.setMemory(memory);
req.setResource(resource); req.setResource(resource);
RegisterNodeManagerResponse registrationResponse = RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req); resourceTracker.registerNodeManager(req);

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -283,6 +285,49 @@ public class TestResourceTrackerService {
response.getRMIdentifier()); response.getRMIdentifier());
} }
@Test
public void testNodeRegistrationWithMinimumAllocations() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "2048");
conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4");
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService
= rm.getResourceTrackerService();
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
NodeId nodeId = BuilderUtils.newNodeId("host", 1234);
req.setNodeId(nodeId);
Resource capability = BuilderUtils.newResource(1024, 1);
req.setResource(capability);
RegisterNodeManagerResponse response1 =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response1.getNodeAction());
capability.setMemory(2048);
capability.setVirtualCores(1);
req.setResource(capability);
RegisterNodeManagerResponse response2 =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response2.getNodeAction());
capability.setMemory(1024);
capability.setVirtualCores(4);
req.setResource(capability);
RegisterNodeManagerResponse response3 =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response3.getNodeAction());
capability.setMemory(2048);
capability.setVirtualCores(4);
req.setResource(capability);
RegisterNodeManagerResponse response4 =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.NORMAL,response4.getNodeAction());
}
@Test @Test
public void testReboot() throws Exception { public void testReboot() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -127,7 +128,7 @@ public class TestNMExpiry {
String hostname1 = "localhost1"; String hostname1 = "localhost1";
String hostname2 = "localhost2"; String hostname2 = "localhost2";
String hostname3 = "localhost3"; String hostname3 = "localhost3";
Resource capability = recordFactory.newRecordInstance(Resource.class); Resource capability = BuilderUtils.newResource(1024, 1);
RegisterNodeManagerRequest request1 = recordFactory RegisterNodeManagerRequest request1 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class); .newRecordInstance(RegisterNodeManagerRequest.class);

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -89,7 +90,7 @@ public class TestRMNMRPCResponseId {
@Test @Test
public void testRPCResponseId() throws IOException { public void testRPCResponseId() throws IOException {
String node = "localhost"; String node = "localhost";
Resource capability = recordFactory.newRecordInstance(Resource.class); Resource capability = BuilderUtils.newResource(1024, 1);
RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
nodeId = Records.newRecord(NodeId.class); nodeId = Records.newRecord(NodeId.class);
nodeId.setHost(node); nodeId.setHost(node);