YARN-3992. TestApplicationPriority.testApplicationPriorityAllocation fails intermittently. (Contributed by Sunil G)

(cherry picked from commit df9e7280db)
This commit is contained in:
rohithsharmaks 2015-08-06 10:43:37 +05:30
parent 5950c1f6f8
commit dc76c4b035
3 changed files with 29 additions and 57 deletions

View File

@ -680,6 +680,9 @@ Release 2.8.0 - UNRELEASED
YARN-433. When RM is catching up with node updates then it should not expire YARN-433. When RM is catching up with node updates then it should not expire
acquired containers. (Xuan Gong via zxu) acquired containers. (Xuan Gong via zxu)
YARN-3992. TestApplicationPriority.testApplicationPriorityAllocation fails
intermittently. (Contributed by Sunil G)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -309,16 +309,20 @@ public class MockAM {
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return this.attemptId; return this.attemptId;
} }
public List<Container> allocateAndWaitForContainers(int nContainer, public List<Container> allocateAndWaitForContainers(int nContainer,
int memory, MockNM nm) throws Exception { int memory, MockNM nm) throws Exception {
return allocateAndWaitForContainers("ANY", nContainer, memory, nm);
}
public List<Container> allocateAndWaitForContainers(String host,
int nContainer, int memory, MockNM nm) throws Exception {
// AM request for containers // AM request for containers
allocate("ANY", memory, nContainer, null); allocate(host, memory, nContainer, null);
// kick the scheduler // kick the scheduler
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
List<Container> conts = List<Container> conts = allocate(new ArrayList<ResourceRequest>(), null)
allocate(new ArrayList<ResourceRequest>(), null) .getAllocatedContainers();
.getAllocatedContainers();
while (conts.size() < nContainer) { while (conts.size() < nContainer) {
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
conts.addAll(allocate(new ArrayList<ResourceRequest>(), conts.addAll(allocate(new ArrayList<ResourceRequest>(),

View File

@ -22,20 +22,16 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@ -44,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@ -59,8 +54,6 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestApplicationPriority { public class TestApplicationPriority {
private static final Log LOG = LogFactory
.getLog(TestApplicationPriority.class);
private final int GB = 1024; private final int GB = 1024;
private YarnConfiguration conf; private YarnConfiguration conf;
@ -166,19 +159,10 @@ public class TestApplicationPriority {
MockAM am1 = MockRM.launchAM(app1, rm, nm1); MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt(); am1.registerAppAttempt();
// add request for containers // allocate 7 containers for App1
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 7); List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
AllocateResponse alloc1Response = am1.schedule(); // send the request 7, 2 * GB, nm1);
// kick the scheduler, 7 containers will be allocated for App1
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
alloc1Response = am1.schedule();
}
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(7, allocated1.size()); Assert.assertEquals(7, allocated1.size());
Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
@ -193,9 +177,7 @@ public class TestApplicationPriority {
RMApp app2 = rm.submitApp(1 * GB, appPriority2); RMApp app2 = rm.submitApp(1 * GB, appPriority2);
// kick the scheduler, 1 GB which was free is given to AM of App2 // kick the scheduler, 1 GB which was free is given to AM of App2
nm1.nodeHeartbeat(true); MockAM am2 = MockRM.launchAM(app2, rm, nm1);
MockAM am2 = rm.sendAMLaunched(app2.getCurrentAppAttempt()
.getAppAttemptId());
am2.registerAppAttempt(); am2.registerAppAttempt();
// check node report, 16 GB used and 0 GB available // check node report, 16 GB used and 0 GB available
@ -210,7 +192,7 @@ public class TestApplicationPriority {
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app1.getApplicationId()).getCurrentAppAttempt(); .get(app1.getApplicationId()).getCurrentAppAttempt();
// kill 2 containers to free up some space // kill 2 containers of App1 to free up some space
int counter = 0; int counter = 0;
for (Container c : allocated1) { for (Container c : allocated1) {
if (++counter > 2) { if (++counter > 2) {
@ -224,22 +206,16 @@ public class TestApplicationPriority {
Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory()); Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory()); Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
// add request for containers App1 // send updated request for App1
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 10); am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>());
am1.schedule(); // send the request for App1
// add request for containers App2
am2.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 3);
AllocateResponse alloc1Response4 = am2.schedule(); // send the request
// kick the scheduler, since App2 priority is more than App1, it will get // kick the scheduler, since App2 priority is more than App1, it will get
// remaining cluster space. // remaining cluster space.
nm1.nodeHeartbeat(true); List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1",
while (alloc1Response4.getAllocatedContainers().size() < 1) { 2, 2 * GB, nm1);
LOG.info("Waiting for containers to be created for app 2...");
Thread.sleep(100); // App2 has got 2 containers now.
alloc1Response4 = am2.schedule(); Assert.assertEquals(2, allocated2.size());
}
// check node report, 16 GB used and 0 GB available // check node report, 16 GB used and 0 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
@ -268,19 +244,10 @@ public class TestApplicationPriority {
MockAM am1 = MockRM.launchAM(app1, rm, nm1); MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt(); am1.registerAppAttempt();
// add request for containers
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 7);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler, 7 containers will be allocated for App1 // kick the scheduler, 7 containers will be allocated for App1
nm1.nodeHeartbeat(true); List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
while (alloc1Response.getAllocatedContainers().size() < 1) { 7, 1 * GB, nm1);
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
alloc1Response = am1.schedule();
}
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(7, allocated1.size()); Assert.assertEquals(7, allocated1.size());
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory()); Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
@ -308,9 +275,7 @@ public class TestApplicationPriority {
rm.killApp(app1.getApplicationId()); rm.killApp(app1.getApplicationId());
// kick the scheduler, app3 (high among pending) gets free space // kick the scheduler, app3 (high among pending) gets free space
nm1.nodeHeartbeat(true); MockAM am3 = MockRM.launchAM(app3, rm, nm1);
MockAM am3 = rm.sendAMLaunched(app3.getCurrentAppAttempt()
.getAppAttemptId());
am3.registerAppAttempt(); am3.registerAppAttempt();
// check node report, 1 GB used and 7 GB available // check node report, 1 GB used and 7 GB available