YARN-3992. TestApplicationPriority.testApplicationPriorityAllocation fails intermittently. (Contributed by Sunil G)
This commit is contained in:
parent
f59612edd7
commit
df9e7280db
|
@ -732,6 +732,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-433. When RM is catching up with node updates then it should not expire
|
YARN-433. When RM is catching up with node updates then it should not expire
|
||||||
acquired containers. (Xuan Gong via zxu)
|
acquired containers. (Xuan Gong via zxu)
|
||||||
|
|
||||||
|
YARN-3992. TestApplicationPriority.testApplicationPriorityAllocation fails
|
||||||
|
intermittently. (Contributed by Sunil G)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -312,13 +312,17 @@ public class MockAM {
|
||||||
|
|
||||||
public List<Container> allocateAndWaitForContainers(int nContainer,
|
public List<Container> allocateAndWaitForContainers(int nContainer,
|
||||||
int memory, MockNM nm) throws Exception {
|
int memory, MockNM nm) throws Exception {
|
||||||
|
return allocateAndWaitForContainers("ANY", nContainer, memory, nm);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Container> allocateAndWaitForContainers(String host,
|
||||||
|
int nContainer, int memory, MockNM nm) throws Exception {
|
||||||
// AM request for containers
|
// AM request for containers
|
||||||
allocate("ANY", memory, nContainer, null);
|
allocate(host, memory, nContainer, null);
|
||||||
// kick the scheduler
|
// kick the scheduler
|
||||||
nm.nodeHeartbeat(true);
|
nm.nodeHeartbeat(true);
|
||||||
List<Container> conts =
|
List<Container> conts = allocate(new ArrayList<ResourceRequest>(), null)
|
||||||
allocate(new ArrayList<ResourceRequest>(), null)
|
.getAllocatedContainers();
|
||||||
.getAllocatedContainers();
|
|
||||||
while (conts.size() < nContainer) {
|
while (conts.size() < nContainer) {
|
||||||
nm.nodeHeartbeat(true);
|
nm.nodeHeartbeat(true);
|
||||||
conts.addAll(allocate(new ArrayList<ResourceRequest>(),
|
conts.addAll(allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
|
|
@ -22,20 +22,16 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
|
@ -44,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
|
@ -59,8 +54,6 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestApplicationPriority {
|
public class TestApplicationPriority {
|
||||||
private static final Log LOG = LogFactory
|
|
||||||
.getLog(TestApplicationPriority.class);
|
|
||||||
private final int GB = 1024;
|
private final int GB = 1024;
|
||||||
|
|
||||||
private YarnConfiguration conf;
|
private YarnConfiguration conf;
|
||||||
|
@ -166,19 +159,10 @@ public class TestApplicationPriority {
|
||||||
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
||||||
am1.registerAppAttempt();
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
// add request for containers
|
// allocate 7 containers for App1
|
||||||
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 7);
|
List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
|
||||||
AllocateResponse alloc1Response = am1.schedule(); // send the request
|
7, 2 * GB, nm1);
|
||||||
|
|
||||||
// kick the scheduler, 7 containers will be allocated for App1
|
|
||||||
nm1.nodeHeartbeat(true);
|
|
||||||
while (alloc1Response.getAllocatedContainers().size() < 1) {
|
|
||||||
LOG.info("Waiting for containers to be created for app 1...");
|
|
||||||
Thread.sleep(100);
|
|
||||||
alloc1Response = am1.schedule();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
|
|
||||||
Assert.assertEquals(7, allocated1.size());
|
Assert.assertEquals(7, allocated1.size());
|
||||||
Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
|
Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
|
||||||
|
|
||||||
|
@ -193,9 +177,7 @@ public class TestApplicationPriority {
|
||||||
RMApp app2 = rm.submitApp(1 * GB, appPriority2);
|
RMApp app2 = rm.submitApp(1 * GB, appPriority2);
|
||||||
|
|
||||||
// kick the scheduler, 1 GB which was free is given to AM of App2
|
// kick the scheduler, 1 GB which was free is given to AM of App2
|
||||||
nm1.nodeHeartbeat(true);
|
MockAM am2 = MockRM.launchAM(app2, rm, nm1);
|
||||||
MockAM am2 = rm.sendAMLaunched(app2.getCurrentAppAttempt()
|
|
||||||
.getAppAttemptId());
|
|
||||||
am2.registerAppAttempt();
|
am2.registerAppAttempt();
|
||||||
|
|
||||||
// check node report, 16 GB used and 0 GB available
|
// check node report, 16 GB used and 0 GB available
|
||||||
|
@ -210,7 +192,7 @@ public class TestApplicationPriority {
|
||||||
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
||||||
.get(app1.getApplicationId()).getCurrentAppAttempt();
|
.get(app1.getApplicationId()).getCurrentAppAttempt();
|
||||||
|
|
||||||
// kill 2 containers to free up some space
|
// kill 2 containers of App1 to free up some space
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
for (Container c : allocated1) {
|
for (Container c : allocated1) {
|
||||||
if (++counter > 2) {
|
if (++counter > 2) {
|
||||||
|
@ -224,22 +206,16 @@ public class TestApplicationPriority {
|
||||||
Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
|
Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
|
||||||
Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
|
Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
|
||||||
|
|
||||||
// add request for containers App1
|
// send updated request for App1
|
||||||
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 10);
|
am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>());
|
||||||
am1.schedule(); // send the request for App1
|
|
||||||
|
|
||||||
// add request for containers App2
|
|
||||||
am2.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 3);
|
|
||||||
AllocateResponse alloc1Response4 = am2.schedule(); // send the request
|
|
||||||
|
|
||||||
// kick the scheduler, since App2 priority is more than App1, it will get
|
// kick the scheduler, since App2 priority is more than App1, it will get
|
||||||
// remaining cluster space.
|
// remaining cluster space.
|
||||||
nm1.nodeHeartbeat(true);
|
List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1",
|
||||||
while (alloc1Response4.getAllocatedContainers().size() < 1) {
|
2, 2 * GB, nm1);
|
||||||
LOG.info("Waiting for containers to be created for app 2...");
|
|
||||||
Thread.sleep(100);
|
// App2 has got 2 containers now.
|
||||||
alloc1Response4 = am2.schedule();
|
Assert.assertEquals(2, allocated2.size());
|
||||||
}
|
|
||||||
|
|
||||||
// check node report, 16 GB used and 0 GB available
|
// check node report, 16 GB used and 0 GB available
|
||||||
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||||
|
@ -268,19 +244,10 @@ public class TestApplicationPriority {
|
||||||
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
||||||
am1.registerAppAttempt();
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
// add request for containers
|
|
||||||
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 7);
|
|
||||||
AllocateResponse alloc1Response = am1.schedule(); // send the request
|
|
||||||
|
|
||||||
// kick the scheduler, 7 containers will be allocated for App1
|
// kick the scheduler, 7 containers will be allocated for App1
|
||||||
nm1.nodeHeartbeat(true);
|
List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
|
||||||
while (alloc1Response.getAllocatedContainers().size() < 1) {
|
7, 1 * GB, nm1);
|
||||||
LOG.info("Waiting for containers to be created for app 1...");
|
|
||||||
Thread.sleep(100);
|
|
||||||
alloc1Response = am1.schedule();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
|
|
||||||
Assert.assertEquals(7, allocated1.size());
|
Assert.assertEquals(7, allocated1.size());
|
||||||
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
|
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
|
||||||
|
|
||||||
|
@ -308,9 +275,7 @@ public class TestApplicationPriority {
|
||||||
rm.killApp(app1.getApplicationId());
|
rm.killApp(app1.getApplicationId());
|
||||||
|
|
||||||
// kick the scheduler, app3 (high among pending) gets free space
|
// kick the scheduler, app3 (high among pending) gets free space
|
||||||
nm1.nodeHeartbeat(true);
|
MockAM am3 = MockRM.launchAM(app3, rm, nm1);
|
||||||
MockAM am3 = rm.sendAMLaunched(app3.getCurrentAppAttempt()
|
|
||||||
.getAppAttemptId());
|
|
||||||
am3.registerAppAttempt();
|
am3.registerAppAttempt();
|
||||||
|
|
||||||
// check node report, 1 GB used and 7 GB available
|
// check node report, 1 GB used and 7 GB available
|
||||||
|
|
Loading…
Reference in New Issue