YARN-1607. TestRM relies on the scheduler assigning multiple containers in a single node update (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1560533 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-22 22:08:04 +00:00
parent 2de391de9f
commit 8ceddeb0c3
2 changed files with 33 additions and 22 deletions

View File

@ -357,6 +357,9 @@ Release 2.4.0 - UNRELEASED
YARN-1606. Fix the default value of yarn.resourcemanager.zk-timeout-ms YARN-1606. Fix the default value of yarn.resourcemanager.zk-timeout-ms
in yarn-default.xml (kasha) in yarn-default.xml (kasha)
YARN-1607. TestRM relies on the scheduler assigning multiple containers in
a single node update (Sandy Ryza)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import junit.framework.Assert; import junit.framework.Assert;
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -56,6 +59,9 @@ public class TestRM {
private static final Log LOG = LogFactory.getLog(TestRM.class); private static final Log LOG = LogFactory.getLog(TestRM.class);
// Milliseconds to sleep for when waiting for something to happen
private final static int WAIT_SLEEP_MS = 100;
@Test @Test
public void testGetNewAppId() throws Exception { public void testGetNewAppId() throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
@ -69,7 +75,7 @@ public class TestRM {
rm.stop(); rm.stop();
} }
@Test @Test (timeout = 30000)
public void testAppWithNoContainers() throws Exception { public void testAppWithNoContainers() throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
@ -91,7 +97,7 @@ public class TestRM {
rm.stop(); rm.stop();
} }
@Test @Test (timeout = 30000)
public void testAppOnMultiNode() throws Exception { public void testAppOnMultiNode() throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
@ -116,30 +122,30 @@ public class TestRM {
am.allocate("h1" , 1000, request, new ArrayList<ContainerId>()); am.allocate("h1" , 1000, request, new ArrayList<ContainerId>());
//kick the scheduler //kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(), List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers(); new ArrayList<ContainerId>()).getAllocatedContainers();
int contReceived = conts.size(); int contReceived = conts.size();
while (contReceived < 3) {//only 3 containers are available on node1 while (contReceived < 3) {//only 3 containers are available on node1
nm1.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(), conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers()); new ArrayList<ContainerId>()).getAllocatedContainers());
contReceived = conts.size(); contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 3); LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
Thread.sleep(2000); Thread.sleep(WAIT_SLEEP_MS);
} }
Assert.assertEquals(3, conts.size()); Assert.assertEquals(3, conts.size());
//send node2 heartbeat //send node2 heartbeat
nm2.nodeHeartbeat(true);
conts = am.allocate(new ArrayList<ResourceRequest>(), conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers(); new ArrayList<ContainerId>()).getAllocatedContainers();
contReceived = conts.size(); contReceived = conts.size();
while (contReceived < 10) { while (contReceived < 10) {
nm2.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(), conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers()); new ArrayList<ContainerId>()).getAllocatedContainers());
contReceived = conts.size(); contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 10); LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
Thread.sleep(2000); Thread.sleep(WAIT_SLEEP_MS);
} }
Assert.assertEquals(10, conts.size()); Assert.assertEquals(10, conts.size());
@ -150,7 +156,7 @@ public class TestRM {
rm.stop(); rm.stop();
} }
@Test @Test (timeout = 40000)
public void testNMToken() throws Exception { public void testNMToken() throws Exception {
MockRM rm = new MockRM(); MockRM rm = new MockRM();
try { try {
@ -187,19 +193,17 @@ public class TestRM {
// initially requesting 2 containers. // initially requesting 2 containers.
AllocateResponse response = AllocateResponse response =
am.allocate("h1", 1000, 2, releaseContainerList); am.allocate("h1", 1000, 2, releaseContainerList);
nm1.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size()); Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2, allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2,
nmTokens); nmTokens, nm1);
Assert.assertEquals(1, nmTokens.size()); Assert.assertEquals(1, nmTokens.size());
// requesting 2 more containers. // requesting 2 more containers.
response = am.allocate("h1", 1000, 2, releaseContainerList); response = am.allocate("h1", 1000, 2, releaseContainerList);
nm1.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size()); Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4, allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4,
nmTokens); nmTokens, nm1);
Assert.assertEquals(1, nmTokens.size()); Assert.assertEquals(1, nmTokens.size());
@ -211,15 +215,19 @@ public class TestRM {
new ArrayList<Container>(); new ArrayList<Container>();
response = am.allocate("h2", 1000, 2, releaseContainerList); response = am.allocate("h2", 1000, 2, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size()); Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2, allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2,
nmTokens); nmTokens, nm2);
Assert.assertEquals(2, nmTokens.size()); Assert.assertEquals(2, nmTokens.size());
// Simulating NM-2 restart. // Simulating NM-2 restart.
nm2 = rm.registerNode("h2:1234", 10000); nm2 = rm.registerNode("h2:1234", 10000);
nm2.nodeHeartbeat(true); // Wait for reconnect to make it through the RM and create a new RMNode
Map<NodeId, RMNode> nodes = rm.getRMContext().getRMNodes();
while (nodes.get(nm2.getNodeId()).getLastNodeHeartBeatResponse()
.getResponseId() > 0) {
Thread.sleep(WAIT_SLEEP_MS);
}
int interval = 40; int interval = 40;
// Wait for nm Token to be cleared. // Wait for nm Token to be cleared.
@ -227,7 +235,7 @@ public class TestRM {
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm2.getNodeId()) && interval-- > 0) { nm2.getNodeId()) && interval-- > 0) {
LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId()); LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId());
Thread.sleep(1000); Thread.sleep(WAIT_SLEEP_MS);
} }
Assert.assertTrue(nmTokenSecretManager Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId())); .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@ -238,10 +246,9 @@ public class TestRM {
// We should again receive the NMToken. // We should again receive the NMToken.
response = am.allocate("h2", 1000, 2, releaseContainerList); response = am.allocate("h2", 1000, 2, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size()); Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4, allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4,
nmTokens); nmTokens, nm2);
Assert.assertEquals(2, nmTokens.size()); Assert.assertEquals(2, nmTokens.size());
// Now rolling over NMToken masterKey. it should resend the NMToken in // Now rolling over NMToken masterKey. it should resend the NMToken in
@ -270,10 +277,9 @@ public class TestRM {
Assert.assertEquals(0, nmTokens.size()); Assert.assertEquals(0, nmTokens.size());
// We should again receive the NMToken. // We should again receive the NMToken.
response = am.allocate("h2", 1000, 1, releaseContainerList); response = am.allocate("h2", 1000, 1, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size()); Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5, allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5,
nmTokens); nmTokens, nm2);
Assert.assertEquals(1, nmTokens.size()); Assert.assertEquals(1, nmTokens.size());
Assert.assertTrue(nmTokenSecretManager Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
@ -305,12 +311,14 @@ public class TestRM {
protected void allocateContainersAndValidateNMTokens(MockAM am, protected void allocateContainersAndValidateNMTokens(MockAM am,
ArrayList<Container> containersReceived, int totalContainerRequested, ArrayList<Container> containersReceived, int totalContainerRequested,
HashMap<String, Token> nmTokens) throws Exception, InterruptedException { HashMap<String, Token> nmTokens, MockNM nm) throws Exception,
InterruptedException {
ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>(); ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>();
AllocateResponse response; AllocateResponse response;
ArrayList<ResourceRequest> resourceRequest = ArrayList<ResourceRequest> resourceRequest =
new ArrayList<ResourceRequest>(); new ArrayList<ResourceRequest>();
while (containersReceived.size() < totalContainerRequested) { while (containersReceived.size() < totalContainerRequested) {
nm.nodeHeartbeat(true);
LOG.info("requesting containers.."); LOG.info("requesting containers..");
response = response =
am.allocate(resourceRequest, releaseContainerList); am.allocate(resourceRequest, releaseContainerList);
@ -326,7 +334,7 @@ public class TestRM {
} }
LOG.info("Got " + containersReceived.size() LOG.info("Got " + containersReceived.size()
+ " containers. Waiting to get " + totalContainerRequested); + " containers. Waiting to get " + totalContainerRequested);
Thread.sleep(500); Thread.sleep(WAIT_SLEEP_MS);
} }
} }