YARN-7642. Add test case to verify context update after container promotion or demotion with or without auto update. Contributed by Weiwei Yang.

(cherry picked from commit 89b6c482c1)
(cherry picked from commit 108f8b8fa1)
This commit is contained in:
Weiwei Yang 2017-12-15 11:58:52 +08:00
parent 8d3d7fa1c5
commit a85036aed1
1 changed files with 154 additions and 6 deletions

View File

@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -50,6 +51,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -96,6 +99,7 @@ import com.google.common.base.Supplier;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class TestAMRMClient { public class TestAMRMClient {
private String schedulerName = null; private String schedulerName = null;
private boolean autoUpdate = false;
private Configuration conf = null; private Configuration conf = null;
private MiniYARNCluster yarnCluster = null; private MiniYARNCluster yarnCluster = null;
private YarnClient yarnClient = null; private YarnClient yarnClient = null;
@ -115,16 +119,19 @@ public class TestAMRMClient {
private String[] racks; private String[] racks;
private final static int DEFAULT_ITERATION = 3; private final static int DEFAULT_ITERATION = 3;
public TestAMRMClient(String schedulerName) { public TestAMRMClient(String schedulerName, boolean autoUpdate) {
this.schedulerName = schedulerName; this.schedulerName = schedulerName;
this.autoUpdate = autoUpdate;
} }
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
List<Object[]> list = new ArrayList<Object[]>(2); // Currently only capacity scheduler supports auto update.
list.add(new Object[] {CapacityScheduler.class.getName()}); return Arrays.asList(new Object[][] {
list.add(new Object[] {FairScheduler.class.getName()}); {CapacityScheduler.class.getName(), true},
return list; {CapacityScheduler.class.getName(), false},
{FairScheduler.class.getName(), false}
});
} }
@Before @Before
@ -135,6 +142,10 @@ public class TestAMRMClient {
private void createClusterAndStartApplication() throws Exception { private void createClusterAndStartApplication() throws Exception {
// start minicluster // start minicluster
this.conf = conf;
if (autoUpdate) {
conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
}
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
conf.setLong( conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
@ -1148,6 +1159,141 @@ public class TestAMRMClient {
assertEquals(1, updatedContainers.size()); assertEquals(1, updatedContainers.size());
} }
@Test
public void testAMRMContainerPromotionAndDemotionWithAutoUpdate()
throws Exception {
AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
(AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
.createAMRMClient();
amClient.init(conf);
amClient.start();
// start am nm client
NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
Assert.assertNotNull(nmClient);
nmClient.init(conf);
nmClient.start();
assertEquals(STATE.STARTED, nmClient.getServiceState());
amClient.registerApplicationMaster("Host", 10000, "");
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
// START OPPORTUNISTIC Container, Send allocation request to RM
Resource reqResource = Resource.newInstance(512, 1);
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(reqResource, null, null, priority2, 0,
true, null, ExecutionTypeRequest
.newInstance(ExecutionType.OPPORTUNISTIC, true)));
// RM should allocate container within 1 calls to allocate()
AllocateResponse allocResponse = waitForAllocation(amClient, 1, 0);
assertEquals(1, allocResponse.getAllocatedContainers().size());
startContainer(allocResponse, nmClient);
Container c = allocResponse.getAllocatedContainers().get(0);
amClient.requestContainerUpdate(c,
UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED));
allocResponse = waitForAllocation(amClient, 0, 1);
// Make sure container is updated.
UpdatedContainer updatedContainer = allocResponse
.getUpdatedContainers().get(0);
// If container auto update is not enabled, we need to notify
// NM about this update.
if (!autoUpdate) {
nmClient.updateContainerResource(updatedContainer.getContainer());
}
// Wait until NM context updated, or fail on timeout.
waitForNMContextUpdate(updatedContainer, ExecutionType.GUARANTEED);
// Once promoted, demote it back to OPPORTUNISTIC
amClient.requestContainerUpdate(updatedContainer.getContainer(),
UpdateContainerRequest.newInstance(
updatedContainer.getContainer().getVersion(),
updatedContainer.getContainer().getId(),
ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
null, ExecutionType.OPPORTUNISTIC));
allocResponse = waitForAllocation(amClient, 0, 1);
// Make sure container is updated.
updatedContainer = allocResponse.getUpdatedContainers().get(0);
if (!autoUpdate) {
nmClient.updateContainerResource(updatedContainer.getContainer());
}
// Wait until NM context updated, or fail on timeout.
waitForNMContextUpdate(updatedContainer, ExecutionType.OPPORTUNISTIC);
amClient.close();
}
private AllocateResponse waitForAllocation(AMRMClient amrmClient,
int expectedAllocatedContainerNum, int expectedUpdatedContainerNum)
throws Exception {
AllocateResponse allocResponse = null;
int iteration = 100;
while(iteration>0) {
allocResponse = amrmClient.allocate(0.1f);
int actualAllocated = allocResponse.getAllocatedContainers().size();
int actualUpdated = allocResponse.getUpdatedContainers().size();
if (expectedAllocatedContainerNum == actualAllocated &&
expectedUpdatedContainerNum == actualUpdated) {
break;
}
Thread.sleep(100);
iteration--;
}
return allocResponse;
}
private void waitForNMContextUpdate(final UpdatedContainer updatedContainer,
final ExecutionType expectedType) {
for (int i=0; i<nodeCount; i++) {
final NodeManager nm = yarnCluster.getNodeManager(i);
if (nm.getNMContext().getNodeId()
.equals(updatedContainer.getContainer().getNodeId())) {
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.Container nmContainer =
nm.getNMContext().getContainers()
.get(updatedContainer.getContainer().getId());
if (nmContainer != null) {
ExecutionType actual = nmContainer.getContainerTokenIdentifier()
.getExecutionType();
return actual.equals(expectedType);
}
return false;
}
}, 1000, 30000);
} catch (TimeoutException e) {
fail("Times out waiting for container state in"
+ " NM context to be updated");
} catch (InterruptedException e) {
// Ignorable.
}
break;
}
// Iterated all nodes but still can't get a match
if (i == nodeCount -1) {
fail("Container doesn't exist in NM context.");
}
}
}
@Test(timeout=60000) @Test(timeout=60000)
public void testAMRMClientWithContainerPromotion() public void testAMRMClientWithContainerPromotion()
throws YarnException, IOException { throws YarnException, IOException {
@ -1433,7 +1579,9 @@ public class TestAMRMClient {
for (UpdatedContainer updatedContainer : allocResponse for (UpdatedContainer updatedContainer : allocResponse
.getUpdatedContainers()) { .getUpdatedContainers()) {
Container container = updatedContainer.getContainer(); Container container = updatedContainer.getContainer();
if (!autoUpdate) {
nmClient.increaseContainerResource(container); nmClient.increaseContainerResource(container);
}
// NodeManager may still need some time to get the stable // NodeManager may still need some time to get the stable
// container status // container status
while (true) { while (true) {