YARN-6218. Fix TestAMRMClient when using FairScheduler. (Miklos Szegedi via rchiang)

(cherry picked from commit 30b91ff9540e35195af834d1bd5983114a556c6d)
This commit is contained in:
Ray Chiang 2017-03-03 12:55:45 -08:00
parent 5eca427da2
commit 303ee13e3c
2 changed files with 87 additions and 55 deletions

View File

@ -86,47 +86,71 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.mortbay.log.Log; import org.mortbay.log.Log;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
/**
* Test application master client class to resource manager.
*/
@RunWith(value = Parameterized.class)
public class TestAMRMClient { public class TestAMRMClient {
static Configuration conf = null; private String schedulerName = null;
static MiniYARNCluster yarnCluster = null; private Configuration conf = null;
static YarnClient yarnClient = null; private MiniYARNCluster yarnCluster = null;
static List<NodeReport> nodeReports = null; private YarnClient yarnClient = null;
static ApplicationAttemptId attemptId = null; private List<NodeReport> nodeReports = null;
static int nodeCount = 3; private ApplicationAttemptId attemptId = null;
private int nodeCount = 3;
static final int rolling_interval_sec = 13; static final int rolling_interval_sec = 13;
static final long am_expire_ms = 4000; static final long am_expire_ms = 4000;
static Resource capability; private Resource capability;
static Priority priority; private Priority priority;
static Priority priority2; private Priority priority2;
static String node; private String node;
static String rack; private String rack;
static String[] nodes; private String[] nodes;
static String[] racks; private String[] racks;
private final static int DEFAULT_ITERATION = 3; private final static int DEFAULT_ITERATION = 3;
@BeforeClass public TestAMRMClient(String schedulerName) {
public static void setup() throws Exception { this.schedulerName = schedulerName;
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> list = new ArrayList<Object[]>(2);
list.add(new Object[] {CapacityScheduler.class.getName()});
list.add(new Object[] {FairScheduler.class.getName()});
return list;
}
@Before
public void setup() throws Exception {
// start minicluster // start minicluster
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
conf.setLong( conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
rolling_interval_sec); rolling_interval_sec);
@ -160,10 +184,7 @@ public class TestAMRMClient {
rack = nodeReports.get(0).getRackName(); rack = nodeReports.get(0).getRackName();
nodes = new String[]{ node }; nodes = new String[]{ node };
racks = new String[]{ rack }; racks = new String[]{ rack };
}
@Before
public void startApp() throws Exception {
// submit new app // submit new app
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =
yarnClient.createApplication().getApplicationSubmissionContext(); yarnClient.createApplication().getApplicationSubmissionContext();
@ -221,13 +242,10 @@ public class TestAMRMClient {
} }
@After @After
public void cancelApp() throws YarnException, IOException { public void teardown() throws YarnException, IOException {
yarnClient.killApplication(attemptId.getApplicationId()); yarnClient.killApplication(attemptId.getApplicationId());
attemptId = null; attemptId = null;
}
@AfterClass
public static void tearDown() {
if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
yarnClient.stop(); yarnClient.stop();
} }
@ -663,8 +681,8 @@ public class TestAMRMClient {
amClient.releaseAssignedContainer(container.getId()); amClient.releaseAssignedContainer(container.getId());
} }
if(allocatedContainerCount < containersRequestedAny) { if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations // let NM heartbeat to RM and trigger allocations
sleep(100); triggerSchedulingWithNMHeartBeat();
} }
} }
@ -686,6 +704,26 @@ public class TestAMRMClient {
} }
} }
/**
* Make sure we get allocations regardless of timing issues.
*/
private void triggerSchedulingWithNMHeartBeat() {
// Simulate fair scheduler update thread
RMContext context = yarnCluster.getResourceManager().getRMContext();
if (context.getScheduler() instanceof FairScheduler) {
FairScheduler scheduler = (FairScheduler)context.getScheduler();
scheduler.update();
}
// Trigger NM's heartbeat to RM and trigger allocations
for (RMNode rmNode : context.getRMNodes().values()) {
context.getScheduler().handle(new NodeUpdateSchedulerEvent(rmNode));
}
if (context.getScheduler() instanceof FairScheduler) {
FairScheduler scheduler = (FairScheduler)context.getScheduler();
scheduler.update();
}
}
@Test (timeout=60000) @Test (timeout=60000)
public void testAllocationWithBlacklist() throws YarnException, IOException { public void testAllocationWithBlacklist() throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null; AMRMClientImpl<ContainerRequest> amClient = null;
@ -817,8 +855,8 @@ public class TestAMRMClient {
allocatedContainerCount += allocResponse.getAllocatedContainers().size(); allocatedContainerCount += allocResponse.getAllocatedContainers().size();
if(allocatedContainerCount == 0) { if(allocatedContainerCount == 0) {
// sleep to let NM's heartbeat to RM and trigger allocations // let NM heartbeat to RM and trigger allocations
sleep(100); triggerSchedulingWithNMHeartBeat();
} }
} }
return allocatedContainerCount; return allocatedContainerCount;
@ -940,6 +978,8 @@ public class TestAMRMClient {
@Test(timeout=60000) @Test(timeout=60000)
public void testAMRMClientWithContainerResourceChange() public void testAMRMClientWithContainerResourceChange()
throws YarnException, IOException { throws YarnException, IOException {
// Fair scheduler does not support resource change
Assume.assumeTrue(schedulerName.equals(CapacityScheduler.class.getName()));
AMRMClient<ContainerRequest> amClient = null; AMRMClient<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
@ -987,8 +1027,8 @@ public class TestAMRMClient {
} }
// send allocation requests // send allocation requests
amClient.allocate(0.1f); amClient.allocate(0.1f);
// sleep to let NM's heartbeat to RM and trigger allocations // let NM heartbeat to RM and trigger allocations
sleep(150); triggerSchedulingWithNMHeartBeat();
// get allocations // get allocations
AllocateResponse allocResponse = amClient.allocate(0.1f); AllocateResponse allocResponse = amClient.allocate(0.1f);
List<Container> containers = allocResponse.getAllocatedContainers(); List<Container> containers = allocResponse.getAllocatedContainers();
@ -1018,14 +1058,14 @@ public class TestAMRMClient {
if (status.getState() == ContainerState.RUNNING) { if (status.getState() == ContainerState.RUNNING) {
break; break;
} }
sleep(100); sleep(10);
} }
} }
} catch (YarnException e) { } catch (YarnException e) {
throw new AssertionError("Exception is not expected: " + e); throw new AssertionError("Exception is not expected: " + e);
} }
// sleep to let NM's heartbeat to RM to confirm container launch // let NM's heartbeat to RM to confirm container launch
sleep(200); triggerSchedulingWithNMHeartBeat();
return containers; return containers;
} }
@ -1075,7 +1115,7 @@ public class TestAMRMClient {
allocResponse.getUpdatedContainers(); allocResponse.getUpdatedContainers();
Assert.assertEquals(1, updatedContainers.size()); Assert.assertEquals(1, updatedContainers.size());
// we should get increase allocation after the next NM's heartbeat to RM // we should get increase allocation after the next NM's heartbeat to RM
sleep(150); triggerSchedulingWithNMHeartBeat();
// get allocations // get allocations
allocResponse = amClient.allocate(0.1f); allocResponse = amClient.allocate(0.1f);
updatedContainers = updatedContainers =
@ -1138,8 +1178,8 @@ public class TestAMRMClient {
} }
if(allocatedContainerCount < containersRequestedAny) { if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations // let NM heartbeat to RM and trigger allocations
sleep(100); triggerSchedulingWithNMHeartBeat();
} }
} }
@ -1221,8 +1261,8 @@ public class TestAMRMClient {
} }
} }
if(numIterations > 0) { if(numIterations > 0) {
// sleep to make sure NM's heartbeat // let NM heartbeat to RM and trigger allocations
sleep(100); triggerSchedulingWithNMHeartBeat();
} }
} }
assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.ask.size());
@ -1280,8 +1320,8 @@ public class TestAMRMClient {
} }
if(allocatedContainers.size() < containersRequestedAny) { if(allocatedContainers.size() < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations // let NM heartbeat to RM and trigger allocations
sleep(100); triggerSchedulingWithNMHeartBeat();
} }
} }
@ -1392,12 +1432,7 @@ public class TestAMRMClient {
while (System.currentTimeMillis() - startTime < while (System.currentTimeMillis() - startTime <
rolling_interval_sec * 1000) { rolling_interval_sec * 1000) {
amClient.allocate(0.1f); amClient.allocate(0.1f);
try { sleep(1000);
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} }
amClient.allocate(0.1f); amClient.allocate(0.1f);
@ -1457,11 +1492,7 @@ public class TestAMRMClient {
} }
} }
amClient.allocate(0.1f); amClient.allocate(0.1f);
try { sleep(1000);
Thread.sleep(1000);
} catch (InterruptedException e) {
// DO NOTHING
}
} }
try { try {

View File

@ -353,7 +353,8 @@ public class FairScheduler extends
* fair shares, deficits, minimum slot allocations, and amount of used and * fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job. * required resources per job.
*/ */
protected void update() { @VisibleForTesting
public void update() {
try { try {
writeLock.lock(); writeLock.lock();