YARN-7642. Add test case to verify context update after container promotion or demotion with or without auto update. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2017-12-15 11:58:52 +08:00
parent e1cb278cd0
commit 89b6c482c1
1 changed files with 151 additions and 6 deletions

View File

@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -51,6 +52,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -96,6 +99,7 @@ import org.eclipse.jetty.util.log.Log;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class TestAMRMClient { public class TestAMRMClient {
private String schedulerName = null; private String schedulerName = null;
private boolean autoUpdate = false;
private Configuration conf = null; private Configuration conf = null;
private MiniYARNCluster yarnCluster = null; private MiniYARNCluster yarnCluster = null;
private YarnClient yarnClient = null; private YarnClient yarnClient = null;
@ -115,16 +119,19 @@ public class TestAMRMClient {
private String[] racks; private String[] racks;
private final static int DEFAULT_ITERATION = 3; private final static int DEFAULT_ITERATION = 3;
public TestAMRMClient(String schedulerName) { public TestAMRMClient(String schedulerName, boolean autoUpdate) {
this.schedulerName = schedulerName; this.schedulerName = schedulerName;
this.autoUpdate = autoUpdate;
} }
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
List<Object[]> list = new ArrayList<Object[]>(2); // Currently only capacity scheduler supports auto update.
list.add(new Object[] {CapacityScheduler.class.getName()}); return Arrays.asList(new Object[][] {
list.add(new Object[] {FairScheduler.class.getName()}); {CapacityScheduler.class.getName(), true},
return list; {CapacityScheduler.class.getName(), false},
{FairScheduler.class.getName(), false}
});
} }
@Before @Before
@ -137,6 +144,9 @@ public class TestAMRMClient {
throws Exception { throws Exception {
// start minicluster // start minicluster
this.conf = conf; this.conf = conf;
if (autoUpdate) {
conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
}
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
conf.setLong( conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
@ -1157,6 +1167,139 @@ public class TestAMRMClient {
assertEquals(1, updatedContainers.size()); assertEquals(1, updatedContainers.size());
} }
@Test
public void testAMRMContainerPromotionAndDemotionWithAutoUpdate()
throws Exception {
AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
(AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
.createAMRMClient();
amClient.init(conf);
amClient.start();
// start am nm client
NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
Assert.assertNotNull(nmClient);
nmClient.init(conf);
nmClient.start();
assertEquals(STATE.STARTED, nmClient.getServiceState());
amClient.registerApplicationMaster("Host", 10000, "");
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
// START OPPORTUNISTIC Container, Send allocation request to RM
Resource reqResource = Resource.newInstance(512, 1);
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(reqResource, null, null, priority2, 0,
true, null, ExecutionTypeRequest
.newInstance(ExecutionType.OPPORTUNISTIC, true)));
// RM should allocate container within 1 calls to allocate()
AllocateResponse allocResponse = waitForAllocation(amClient, 1, 0);
assertEquals(1, allocResponse.getAllocatedContainers().size());
startContainer(allocResponse, nmClient);
Container c = allocResponse.getAllocatedContainers().get(0);
amClient.requestContainerUpdate(c,
UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED));
allocResponse = waitForAllocation(amClient, 0, 1);
// Make sure container is updated.
UpdatedContainer updatedContainer = allocResponse
.getUpdatedContainers().get(0);
// If container auto update is not enabled, we need to notify
// NM about this update.
if (!autoUpdate) {
nmClient.updateContainerResource(updatedContainer.getContainer());
}
// Wait until NM context updated, or fail on timeout.
waitForNMContextUpdate(updatedContainer, ExecutionType.GUARANTEED);
// Once promoted, demote it back to OPPORTUNISTIC
amClient.requestContainerUpdate(updatedContainer.getContainer(),
UpdateContainerRequest.newInstance(
updatedContainer.getContainer().getVersion(),
updatedContainer.getContainer().getId(),
ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
null, ExecutionType.OPPORTUNISTIC));
allocResponse = waitForAllocation(amClient, 0, 1);
// Make sure container is updated.
updatedContainer = allocResponse.getUpdatedContainers().get(0);
if (!autoUpdate) {
nmClient.updateContainerResource(updatedContainer.getContainer());
}
// Wait until NM context updated, or fail on timeout.
waitForNMContextUpdate(updatedContainer, ExecutionType.OPPORTUNISTIC);
amClient.close();
}
private AllocateResponse waitForAllocation(AMRMClient amrmClient,
int expectedAllocatedContainerNum, int expectedUpdatedContainerNum)
throws Exception {
AllocateResponse allocResponse = null;
int iteration = 100;
while(iteration>0) {
allocResponse = amrmClient.allocate(0.1f);
int actualAllocated = allocResponse.getAllocatedContainers().size();
int actualUpdated = allocResponse.getUpdatedContainers().size();
if (expectedAllocatedContainerNum == actualAllocated &&
expectedUpdatedContainerNum == actualUpdated) {
break;
}
Thread.sleep(100);
iteration--;
}
return allocResponse;
}
private void waitForNMContextUpdate(UpdatedContainer updatedContainer,
ExecutionType expectedType) {
for (int i=0; i<nodeCount; i++) {
NodeManager nm = yarnCluster.getNodeManager(i);
if (nm.getNMContext().getNodeId()
.equals(updatedContainer.getContainer().getNodeId())) {
try {
GenericTestUtils.waitFor(() -> {
org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.Container nmContainer =
nm.getNMContext().getContainers()
.get(updatedContainer.getContainer().getId());
if (nmContainer != null) {
ExecutionType actual = nmContainer.getContainerTokenIdentifier()
.getExecutionType();
return actual.equals(expectedType);
}
return false;
}, 1000, 30000);
} catch (TimeoutException e) {
fail("Times out waiting for container state in"
+ " NM context to be updated");
} catch (InterruptedException e) {
// Ignorable.
}
break;
}
// Iterated all nodes but still can't get a match
if (i == nodeCount -1) {
fail("Container doesn't exist in NM context.");
}
}
}
@Test(timeout=60000) @Test(timeout=60000)
public void testAMRMClientWithContainerPromotion() public void testAMRMClientWithContainerPromotion()
throws YarnException, IOException { throws YarnException, IOException {
@ -1446,7 +1589,9 @@ public class TestAMRMClient {
for (UpdatedContainer updatedContainer : allocResponse for (UpdatedContainer updatedContainer : allocResponse
.getUpdatedContainers()) { .getUpdatedContainers()) {
Container container = updatedContainer.getContainer(); Container container = updatedContainer.getContainer();
nmClient.increaseContainerResource(container); if (!autoUpdate) {
nmClient.increaseContainerResource(container);
}
// NodeManager may still need some time to get the stable // NodeManager may still need some time to get the stable
// container status // container status
while (true) { while (true) {