YARN-879. Fixed tests w.r.t o.a.h.y.server.resourcemanager.Application. Contributed by Junping Du.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1530904 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devarajulu K 2013-10-10 09:50:32 +00:00
parent 1c128966f3
commit 989d6fffcd
6 changed files with 127 additions and 73 deletions

View File

@ -80,6 +80,9 @@ Release 2.2.1 - UNRELEASED
so that clients don't need to do scheme-mangling. (Omkar Vinit Joshi via so that clients don't need to do scheme-mangling. (Omkar Vinit Joshi via
vinodkv) vinodkv)
YARN-879. Fixed tests w.r.t o.a.h.y.server.resourcemanager.Application.
(Junping Du via devaraj)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -34,6 +34,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -47,11 +49,16 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State; import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@Private @Private
@ -89,16 +96,23 @@ public class Application {
Resource used = recordFactory.newRecordInstance(Resource.class); Resource used = recordFactory.newRecordInstance(Resource.class);
public Application(String user, ResourceManager resourceManager) { public Application(String user, ResourceManager resourceManager)
throws YarnException {
this(user, "default", resourceManager); this(user, "default", resourceManager);
} }
public Application(String user, String queue, ResourceManager resourceManager) { public Application(String user, String queue, ResourceManager resourceManager)
throws YarnException {
this.user = user; this.user = user;
this.queue = queue; this.queue = queue;
this.resourceManager = resourceManager; this.resourceManager = resourceManager;
this.applicationId = // register an application
this.resourceManager.getClientRMService().getNewApplicationId(); GetNewApplicationRequest request =
Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse newApp =
this.resourceManager.getClientRMService().getNewApplication(request);
this.applicationId = newApp.getApplicationId();
this.applicationAttemptId = this.applicationAttemptId =
ApplicationAttemptId.newInstance(this.applicationId, ApplicationAttemptId.newInstance(this.applicationId,
this.numAttempts.getAndIncrement()); this.numAttempts.getAndIncrement());
@ -116,6 +130,10 @@ public class Application {
return applicationId; return applicationId;
} }
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
public static String resolve(String hostName) { public static String resolve(String hostName) {
return NetworkTopology.DEFAULT_RACK; return NetworkTopology.DEFAULT_RACK;
} }
@ -132,10 +150,25 @@ public class Application {
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(this.applicationId); context.setApplicationId(this.applicationId);
context.setQueue(this.queue); context.setQueue(this.queue);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer
= Records.newRecord(ContainerLaunchContext.class);
context.setAMContainerSpec(amContainer);
context.setResource(Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
SubmitApplicationRequest request = recordFactory SubmitApplicationRequest request = recordFactory
.newRecordInstance(SubmitApplicationRequest.class); .newRecordInstance(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(context); request.setApplicationSubmissionContext(context);
final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
resourceManager.getClientRMService().submitApplication(request); resourceManager.getClientRMService().submitApplication(request);
// Notify scheduler
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
this.applicationAttemptId, this.queue, this.user);
scheduler.handle(appAddedEvent1);
} }
public synchronized void addResourceRequestSpec( public synchronized void addResourceRequestSpec(
@ -267,17 +300,13 @@ public class Application {
} }
// Get resources from the ResourceManager // Get resources from the ResourceManager
resourceManager.getResourceScheduler().allocate(applicationAttemptId, Allocation allocation = resourceManager.getResourceScheduler().allocate(
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(), null, null); applicationAttemptId, new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(), null, null);
System.out.println("-=======" + applicationAttemptId); System.out.println("-=======" + applicationAttemptId);
System.out.println("----------" + resourceManager.getRMContext().getRMApps() System.out.println("----------" + resourceManager.getRMContext().getRMApps()
.get(applicationId).getRMAppAttempt(applicationAttemptId)); .get(applicationId).getRMAppAttempt(applicationAttemptId));
List<Container> containers = allocation.getContainers();
List<Container> containers = null;
// TODO: Fix
// resourceManager.getRMContext().getRMApps()
// .get(applicationId).getRMAppAttempt(applicationAttemptId)
// .pullNewlyAllocatedContainers();
// Clear state for next interaction with ResourceManager // Clear state for next interaction with ResourceManager
ask.clear(); ask.clear();

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -71,11 +70,11 @@ public class NodeManager implements ContainerManagementProtocol {
final private String rackName; final private String rackName;
final private NodeId nodeId; final private NodeId nodeId;
final private Resource capability; final private Resource capability;
final private ResourceManager resourceManager;
Resource available = recordFactory.newRecordInstance(Resource.class); Resource available = recordFactory.newRecordInstance(Resource.class);
Resource used = recordFactory.newRecordInstance(Resource.class); Resource used = recordFactory.newRecordInstance(Resource.class);
final ResourceTrackerService resourceTrackerService; final ResourceTrackerService resourceTrackerService;
final FiCaSchedulerNode schedulerNode;
final Map<ApplicationId, List<Container>> containers = final Map<ApplicationId, List<Container>> containers =
new HashMap<ApplicationId, List<Container>>(); new HashMap<ApplicationId, List<Container>>();
@ -84,15 +83,14 @@ public class NodeManager implements ContainerManagementProtocol {
public NodeManager(String hostName, int containerManagerPort, int httpPort, public NodeManager(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability, String rackName, Resource capability,
ResourceTrackerService resourceTrackerService, RMContext rmContext) ResourceManager resourceManager)
throws IOException, YarnException { throws IOException, YarnException {
this.containerManagerAddress = hostName + ":" + containerManagerPort; this.containerManagerAddress = hostName + ":" + containerManagerPort;
this.nodeHttpAddress = hostName + ":" + httpPort; this.nodeHttpAddress = hostName + ":" + httpPort;
this.rackName = rackName; this.rackName = rackName;
this.resourceTrackerService = resourceTrackerService; this.resourceTrackerService = resourceManager.getResourceTrackerService();
this.capability = capability; this.capability = capability;
Resources.addTo(available, capability); Resources.addTo(available, capability);
this.nodeId = NodeId.newInstance(hostName, containerManagerPort); this.nodeId = NodeId.newInstance(hostName, containerManagerPort);
RegisterNodeManagerRequest request = recordFactory RegisterNodeManagerRequest request = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class); .newRecordInstance(RegisterNodeManagerRequest.class);
@ -101,14 +99,8 @@ public class NodeManager implements ContainerManagementProtocol {
request.setNodeId(this.nodeId); request.setNodeId(this.nodeId);
request.setNMVersion(YarnVersionInfo.getVersion()); request.setNMVersion(YarnVersionInfo.getVersion());
resourceTrackerService.registerNodeManager(request); resourceTrackerService.registerNodeManager(request);
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get( this.resourceManager = resourceManager;
this.nodeId), false); resourceManager.getResourceScheduler().getNodeReport(this.nodeId);
// Sanity check
Assert.assertEquals(capability.getMemory(),
schedulerNode.getAvailableResource().getMemory());
Assert.assertEquals(capability.getVirtualCores(),
schedulerNode.getAvailableResource().getVirtualCores());
} }
public String getHostName() { public String getHostName() {
@ -220,9 +212,11 @@ public class NodeManager implements ContainerManagementProtocol {
synchronized public void checkResourceUsage() { synchronized public void checkResourceUsage() {
LOG.info("Checking resource usage for " + containerManagerAddress); LOG.info("Checking resource usage for " + containerManagerAddress);
Assert.assertEquals(available.getMemory(), Assert.assertEquals(available.getMemory(),
schedulerNode.getAvailableResource().getMemory()); resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getAvailableResource().getMemory());
Assert.assertEquals(used.getMemory(), Assert.assertEquals(used.getMemory(),
schedulerNode.getUsedResource().getMemory()); resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getUsedResource().getMemory());
} }
@Override @Override
@ -232,9 +226,9 @@ public class NodeManager implements ContainerManagementProtocol {
String applicationId = String applicationId =
String.valueOf(containerID.getApplicationAttemptId() String.valueOf(containerID.getApplicationAttemptId()
.getApplicationId().getId()); .getApplicationId().getId());
// Mark the container as COMPLETE // Mark the container as COMPLETE
List<Container> applicationContainers = containers.get(applicationId); List<Container> applicationContainers = containers.get(containerID.getApplicationAttemptId()
.getApplicationId());
for (Container c : applicationContainers) { for (Container c : applicationContainers) {
if (c.getId().compareTo(containerID) == 0) { if (c.getId().compareTo(containerID) == 0) {
ContainerStatus containerStatus = containerStatusMap.get(c); ContainerStatus containerStatus = containerStatusMap.get(c);

View File

@ -34,7 +34,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -62,13 +66,18 @@ public class TestResourceManager {
registerNode(String hostName, int containerManagerPort, int httpPort, registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability) throws IOException, String rackName, Resource capability) throws IOException,
YarnException { YarnException {
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
hostName, containerManagerPort, httpPort, rackName, capability, new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
resourceManager.getResourceTrackerService(), resourceManager hostName, containerManagerPort, httpPort, rackName, capability,
.getRMContext()); resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId()));
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
} }
// @Test @Test
public void testResourceAllocation() throws IOException, public void testResourceAllocation() throws IOException,
YarnException { YarnException {
LOG.info("--- START: testResourceAllocation ---"); LOG.info("--- START: testResourceAllocation ---");
@ -80,14 +89,12 @@ public class TestResourceManager {
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 =
registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory, 1)); Resources.createResource(memory, 1));
nm1.heartbeat();
// Register node2 // Register node2
String host2 = "host2"; String host2 = "host2";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 =
registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory/2, 1)); Resources.createResource(memory/2, 1));
nm2.heartbeat();
// Submit an application // Submit an application
Application application = new Application("user1", resourceManager); Application application = new Application("user1", resourceManager);
@ -109,19 +116,18 @@ public class TestResourceManager {
final int memory2 = 2048; final int memory2 = 2048;
Resource capability2 = Resources.createResource(memory2, 1); Resource capability2 = Resources.createResource(memory2, 1);
Priority priority0 = Priority priority0 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher
application.addResourceRequestSpec(priority0, capability2); application.addResourceRequestSpec(priority0, capability2);
// Send resource requests to the scheduler // Send resource requests to the scheduler
application.schedule(); application.schedule();
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
nm1.heartbeat(); nodeUpdate(nm1);
// Get allocations from the scheduler // Get allocations from the scheduler
application.schedule(); application.schedule();
nm1.heartbeat();
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
LOG.info("Adding new tasks..."); LOG.info("Adding new tasks...");
@ -137,18 +143,13 @@ public class TestResourceManager {
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
LOG.info("Sending hb from host2"); nodeUpdate(nm2);
nm2.heartbeat(); nodeUpdate(nm1);
LOG.info("Sending hb from host1");
nm1.heartbeat();
// Get allocations from the scheduler // Get allocations from the scheduler
LOG.info("Trying to allocate..."); LOG.info("Trying to allocate...");
application.schedule(); application.schedule();
nm1.heartbeat();
nm2.heartbeat();
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
// Complete tasks // Complete tasks
@ -157,14 +158,24 @@ public class TestResourceManager {
application.finishTask(t2); application.finishTask(t2);
application.finishTask(t3); application.finishTask(t3);
// Send heartbeat // Notify scheduler application is finished.
nm1.heartbeat(); AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
nm2.heartbeat(); application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
LOG.info("--- END: testResourceAllocation ---"); LOG.info("--- END: testResourceAllocation ---");
} }
private void nodeUpdate(
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1) {
RMNode node = resourceManager.getRMContext().getRMNodes().get(nm1.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
@Test @Test
public void testNodeHealthReportIsNotNull() throws Exception{ public void testNodeHealthReportIsNotNull() throws Exception{
String host1 = "host1"; String host1 = "host1";

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -111,6 +112,8 @@ public class TestCapacityScheduler {
conf.setClass(YarnConfiguration.RM_SCHEDULER, conf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class); CapacityScheduler.class, ResourceScheduler.class);
resourceManager.init(conf); resourceManager.init(conf);
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
} }
@ -156,13 +159,18 @@ public class TestCapacityScheduler {
registerNode(String hostName, int containerManagerPort, int httpPort, registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability) String rackName, Resource capability)
throws IOException, YarnException { throws IOException, YarnException {
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
hostName, containerManagerPort, httpPort, rackName, capability, new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
resourceManager.getResourceTrackerService(), resourceManager hostName, containerManagerPort, httpPort, rackName, capability,
.getRMContext()); resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId()));
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
} }
// @Test @Test
public void testCapacityScheduler() throws Exception { public void testCapacityScheduler() throws Exception {
LOG.info("--- START: testCapacityScheduler ---"); LOG.info("--- START: testCapacityScheduler ---");
@ -172,14 +180,12 @@ public class TestCapacityScheduler {
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(4 * GB, 1)); Resources.createResource(4 * GB, 1));
nm_0.heartbeat();
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * GB, 1)); Resources.createResource(2 * GB, 1));
nm_1.heartbeat();
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority priority_0 =
@ -227,8 +233,12 @@ public class TestCapacityScheduler {
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
LOG.info("Kick!"); LOG.info("Kick!");
nm_0.heartbeat(); // task_0_0 and task_1_0 allocated, used=4G
nm_1.heartbeat(); // nothing allocated // task_0_0 and task_1_0 allocated, used=4G
nodeUpdate(nm_0);
// nothing allocated
nodeUpdate(nm_1);
// Get allocations from the scheduler // Get allocations from the scheduler
application_0.schedule(); // task_0_0 application_0.schedule(); // task_0_0
@ -237,9 +247,6 @@ public class TestCapacityScheduler {
application_1.schedule(); // task_1_0 application_1.schedule(); // task_1_0
checkApplicationResourceUsage(3 * GB, application_1); checkApplicationResourceUsage(3 * GB, application_1);
nm_0.heartbeat();
nm_1.heartbeat();
checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G)
checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available
@ -259,10 +266,12 @@ public class TestCapacityScheduler {
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
LOG.info("Sending hb from " + nm_0.getHostName()); LOG.info("Sending hb from " + nm_0.getHostName());
nm_0.heartbeat(); // nothing new, used=4G // nothing new, used=4G
nodeUpdate(nm_0);
LOG.info("Sending hb from " + nm_1.getHostName()); LOG.info("Sending hb from " + nm_1.getHostName());
nm_1.heartbeat(); // task_0_3, used=2G // task_0_1 is prefer as locality, used=2G
nodeUpdate(nm_1);
// Get allocations from the scheduler // Get allocations from the scheduler
LOG.info("Trying to allocate..."); LOG.info("Trying to allocate...");
@ -272,14 +281,23 @@ public class TestCapacityScheduler {
application_1.schedule(); application_1.schedule();
checkApplicationResourceUsage(5 * GB, application_1); checkApplicationResourceUsage(5 * GB, application_1);
nm_0.heartbeat(); nodeUpdate(nm_0);
nm_1.heartbeat(); nodeUpdate(nm_1);
checkNodeResourceUsage(4*GB, nm_0); checkNodeResourceUsage(4*GB, nm_0);
checkNodeResourceUsage(2*GB, nm_1); checkNodeResourceUsage(2*GB, nm_1);
LOG.info("--- END: testCapacityScheduler ---"); LOG.info("--- END: testCapacityScheduler ---");
} }
private void nodeUpdate(
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) {
RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues // Define top-level queues

View File

@ -96,8 +96,7 @@ public class TestFifoScheduler {
YarnException { YarnException {
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
hostName, containerManagerPort, nmHttpPort, rackName, capability, hostName, containerManagerPort, nmHttpPort, rackName, capability,
resourceManager.getResourceTrackerService(), resourceManager resourceManager);
.getRMContext());
} }
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {