diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index ffe9ce3b9dc..9f5bad42073 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.TimeoutException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -50,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -96,6 +99,7 @@ @RunWith(value = Parameterized.class) public class TestAMRMClient { private String schedulerName = null; + private boolean autoUpdate = false; private Configuration conf = null; private MiniYARNCluster yarnCluster = null; private YarnClient yarnClient = null; @@ -115,16 +119,19 @@ public class TestAMRMClient { private String[] racks; private final static int DEFAULT_ITERATION = 3; - public TestAMRMClient(String schedulerName) { + public TestAMRMClient(String schedulerName, boolean autoUpdate) { this.schedulerName = schedulerName; + this.autoUpdate = autoUpdate; } @Parameterized.Parameters public static Collection data() { - List list = new ArrayList(2); - list.add(new Object[] {CapacityScheduler.class.getName()}); - list.add(new Object[] {FairScheduler.class.getName()}); - return list; + // Currently only capacity scheduler supports auto update. + return Arrays.asList(new Object[][] { + {CapacityScheduler.class.getName(), true}, + {CapacityScheduler.class.getName(), false}, + {FairScheduler.class.getName(), false} + }); } @Before @@ -135,6 +142,10 @@ public void setup() throws Exception { private void createClusterAndStartApplication() throws Exception { // start minicluster + this.conf = conf; + if (autoUpdate) { + conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); + } conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); conf.setLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, @@ -1148,6 +1159,141 @@ private void doContainerResourceChange( assertEquals(1, updatedContainers.size()); } + @Test + public void testAMRMContainerPromotionAndDemotionWithAutoUpdate() + throws Exception { + AMRMClientImpl amClient = + (AMRMClientImpl) AMRMClient + .createAMRMClient(); + amClient.init(conf); + amClient.start(); + + // start am nm client + NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); + Assert.assertNotNull(nmClient); + nmClient.init(conf); + nmClient.start(); + assertEquals(STATE.STARTED, nmClient.getServiceState()); + + amClient.registerApplicationMaster("Host", 10000, ""); + + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // START OPPORTUNISTIC Container, Send allocation request to RM + Resource reqResource = Resource.newInstance(512, 1); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(reqResource, null, null, priority2, 0, + true, null, ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))); + + // RM should allocate container within 1 calls to allocate() + AllocateResponse allocResponse = waitForAllocation(amClient, 1, 0); + + assertEquals(1, allocResponse.getAllocatedContainers().size()); + startContainer(allocResponse, nmClient); + + Container c = allocResponse.getAllocatedContainers().get(0); + amClient.requestContainerUpdate(c, + UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + + allocResponse = waitForAllocation(amClient, 0, 1); + + // Make sure container is updated. + UpdatedContainer updatedContainer = allocResponse + .getUpdatedContainers().get(0); + + // If container auto update is not enabled, we need to notify + // NM about this update. + if (!autoUpdate) { + nmClient.updateContainerResource(updatedContainer.getContainer()); + } + + // Wait until NM context updated, or fail on timeout. + waitForNMContextUpdate(updatedContainer, ExecutionType.GUARANTEED); + + // Once promoted, demote it back to OPPORTUNISTIC + amClient.requestContainerUpdate(updatedContainer.getContainer(), + UpdateContainerRequest.newInstance( + updatedContainer.getContainer().getVersion(), + updatedContainer.getContainer().getId(), + ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC)); + + allocResponse = waitForAllocation(amClient, 0, 1); + + // Make sure container is updated. + updatedContainer = allocResponse.getUpdatedContainers().get(0); + + if (!autoUpdate) { + nmClient.updateContainerResource(updatedContainer.getContainer()); + } + + // Wait until NM context updated, or fail on timeout. + waitForNMContextUpdate(updatedContainer, ExecutionType.OPPORTUNISTIC); + + amClient.close(); + } + + private AllocateResponse waitForAllocation(AMRMClient amrmClient, + int expectedAllocatedContainerNum, int expectedUpdatedContainerNum) + throws Exception { + AllocateResponse allocResponse = null; + int iteration = 100; + while(iteration>0) { + allocResponse = amrmClient.allocate(0.1f); + int actualAllocated = allocResponse.getAllocatedContainers().size(); + int actualUpdated = allocResponse.getUpdatedContainers().size(); + if (expectedAllocatedContainerNum == actualAllocated && + expectedUpdatedContainerNum == actualUpdated) { + break; + } + Thread.sleep(100); + iteration--; + } + return allocResponse; + } + + private void waitForNMContextUpdate(final UpdatedContainer updatedContainer, + final ExecutionType expectedType) { + for (int i=0; i() { + @Override public Boolean get() { + org.apache.hadoop.yarn.server.nodemanager.containermanager + .container.Container nmContainer = + nm.getNMContext().getContainers() + .get(updatedContainer.getContainer().getId()); + if (nmContainer != null) { + ExecutionType actual = nmContainer.getContainerTokenIdentifier() + .getExecutionType(); + return actual.equals(expectedType); + } + return false; + } + }, 1000, 30000); + } catch (TimeoutException e) { + fail("Times out waiting for container state in" + + " NM context to be updated"); + } catch (InterruptedException e) { + // Ignorable. + } + break; + } + + // Iterated all nodes but still can't get a match + if (i == nodeCount -1) { + fail("Container doesn't exist in NM context."); + } + } + } + @Test(timeout=60000) public void testAMRMClientWithContainerPromotion() throws YarnException, IOException { @@ -1433,7 +1579,9 @@ private void updateContainerExecType(AllocateResponse allocResponse, for (UpdatedContainer updatedContainer : allocResponse .getUpdatedContainers()) { Container container = updatedContainer.getContainer(); - nmClient.increaseContainerResource(container); + if (!autoUpdate) { + nmClient.increaseContainerResource(container); + } // NodeManager may still need some time to get the stable // container status while (true) {