YARN-7642. Add test case to verify context update after container promotion or demotion with or without auto update. Contributed by Weiwei Yang.
(cherry picked from commit 89b6c482c1
)
This commit is contained in:
parent
2b77729fdf
commit
108f8b8fa1
|
@ -33,6 +33,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -50,6 +51,7 @@ import org.apache.hadoop.security.SecurityUtil;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
|
@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
@ -96,6 +99,7 @@ import com.google.common.base.Supplier;
|
|||
@RunWith(value = Parameterized.class)
|
||||
public class TestAMRMClient {
|
||||
private String schedulerName = null;
|
||||
private boolean autoUpdate = false;
|
||||
private Configuration conf = null;
|
||||
private MiniYARNCluster yarnCluster = null;
|
||||
private YarnClient yarnClient = null;
|
||||
|
@ -115,16 +119,19 @@ public class TestAMRMClient {
|
|||
private String[] racks;
|
||||
private final static int DEFAULT_ITERATION = 3;
|
||||
|
||||
public TestAMRMClient(String schedulerName) {
|
||||
public TestAMRMClient(String schedulerName, boolean autoUpdate) {
|
||||
this.schedulerName = schedulerName;
|
||||
this.autoUpdate = autoUpdate;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
List<Object[]> list = new ArrayList<Object[]>(2);
|
||||
list.add(new Object[] {CapacityScheduler.class.getName()});
|
||||
list.add(new Object[] {FairScheduler.class.getName()});
|
||||
return list;
|
||||
// Currently only capacity scheduler supports auto update.
|
||||
return Arrays.asList(new Object[][] {
|
||||
{CapacityScheduler.class.getName(), true},
|
||||
{CapacityScheduler.class.getName(), false},
|
||||
{FairScheduler.class.getName(), false}
|
||||
});
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -135,6 +142,10 @@ public class TestAMRMClient {
|
|||
|
||||
private void createClusterAndStartApplication() throws Exception {
|
||||
// start minicluster
|
||||
this.conf = conf;
|
||||
if (autoUpdate) {
|
||||
conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
|
||||
}
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||
|
@ -1148,6 +1159,141 @@ public class TestAMRMClient {
|
|||
assertEquals(1, updatedContainers.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMRMContainerPromotionAndDemotionWithAutoUpdate()
|
||||
throws Exception {
|
||||
AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
|
||||
(AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
|
||||
.createAMRMClient();
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
|
||||
// start am nm client
|
||||
NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
|
||||
Assert.assertNotNull(nmClient);
|
||||
nmClient.init(conf);
|
||||
nmClient.start();
|
||||
assertEquals(STATE.STARTED, nmClient.getServiceState());
|
||||
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
|
||||
// setup container request
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
// START OPPORTUNISTIC Container, Send allocation request to RM
|
||||
Resource reqResource = Resource.newInstance(512, 1);
|
||||
amClient.addContainerRequest(
|
||||
new AMRMClient.ContainerRequest(reqResource, null, null, priority2, 0,
|
||||
true, null, ExecutionTypeRequest
|
||||
.newInstance(ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
// RM should allocate container within 1 calls to allocate()
|
||||
AllocateResponse allocResponse = waitForAllocation(amClient, 1, 0);
|
||||
|
||||
assertEquals(1, allocResponse.getAllocatedContainers().size());
|
||||
startContainer(allocResponse, nmClient);
|
||||
|
||||
Container c = allocResponse.getAllocatedContainers().get(0);
|
||||
amClient.requestContainerUpdate(c,
|
||||
UpdateContainerRequest.newInstance(c.getVersion(),
|
||||
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
|
||||
null, ExecutionType.GUARANTEED));
|
||||
|
||||
allocResponse = waitForAllocation(amClient, 0, 1);
|
||||
|
||||
// Make sure container is updated.
|
||||
UpdatedContainer updatedContainer = allocResponse
|
||||
.getUpdatedContainers().get(0);
|
||||
|
||||
// If container auto update is not enabled, we need to notify
|
||||
// NM about this update.
|
||||
if (!autoUpdate) {
|
||||
nmClient.updateContainerResource(updatedContainer.getContainer());
|
||||
}
|
||||
|
||||
// Wait until NM context updated, or fail on timeout.
|
||||
waitForNMContextUpdate(updatedContainer, ExecutionType.GUARANTEED);
|
||||
|
||||
// Once promoted, demote it back to OPPORTUNISTIC
|
||||
amClient.requestContainerUpdate(updatedContainer.getContainer(),
|
||||
UpdateContainerRequest.newInstance(
|
||||
updatedContainer.getContainer().getVersion(),
|
||||
updatedContainer.getContainer().getId(),
|
||||
ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
|
||||
null, ExecutionType.OPPORTUNISTIC));
|
||||
|
||||
allocResponse = waitForAllocation(amClient, 0, 1);
|
||||
|
||||
// Make sure container is updated.
|
||||
updatedContainer = allocResponse.getUpdatedContainers().get(0);
|
||||
|
||||
if (!autoUpdate) {
|
||||
nmClient.updateContainerResource(updatedContainer.getContainer());
|
||||
}
|
||||
|
||||
// Wait until NM context updated, or fail on timeout.
|
||||
waitForNMContextUpdate(updatedContainer, ExecutionType.OPPORTUNISTIC);
|
||||
|
||||
amClient.close();
|
||||
}
|
||||
|
||||
private AllocateResponse waitForAllocation(AMRMClient amrmClient,
|
||||
int expectedAllocatedContainerNum, int expectedUpdatedContainerNum)
|
||||
throws Exception {
|
||||
AllocateResponse allocResponse = null;
|
||||
int iteration = 100;
|
||||
while(iteration>0) {
|
||||
allocResponse = amrmClient.allocate(0.1f);
|
||||
int actualAllocated = allocResponse.getAllocatedContainers().size();
|
||||
int actualUpdated = allocResponse.getUpdatedContainers().size();
|
||||
if (expectedAllocatedContainerNum == actualAllocated &&
|
||||
expectedUpdatedContainerNum == actualUpdated) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
iteration--;
|
||||
}
|
||||
return allocResponse;
|
||||
}
|
||||
|
||||
private void waitForNMContextUpdate(final UpdatedContainer updatedContainer,
|
||||
final ExecutionType expectedType) {
|
||||
for (int i=0; i<nodeCount; i++) {
|
||||
final NodeManager nm = yarnCluster.getNodeManager(i);
|
||||
if (nm.getNMContext().getNodeId()
|
||||
.equals(updatedContainer.getContainer().getNodeId())) {
|
||||
try {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override public Boolean get() {
|
||||
org.apache.hadoop.yarn.server.nodemanager.containermanager
|
||||
.container.Container nmContainer =
|
||||
nm.getNMContext().getContainers()
|
||||
.get(updatedContainer.getContainer().getId());
|
||||
if (nmContainer != null) {
|
||||
ExecutionType actual = nmContainer.getContainerTokenIdentifier()
|
||||
.getExecutionType();
|
||||
return actual.equals(expectedType);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}, 1000, 30000);
|
||||
} catch (TimeoutException e) {
|
||||
fail("Times out waiting for container state in"
|
||||
+ " NM context to be updated");
|
||||
} catch (InterruptedException e) {
|
||||
// Ignorable.
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Iterated all nodes but still can't get a match
|
||||
if (i == nodeCount -1) {
|
||||
fail("Container doesn't exist in NM context.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testAMRMClientWithContainerPromotion()
|
||||
throws YarnException, IOException {
|
||||
|
@ -1433,7 +1579,9 @@ public class TestAMRMClient {
|
|||
for (UpdatedContainer updatedContainer : allocResponse
|
||||
.getUpdatedContainers()) {
|
||||
Container container = updatedContainer.getContainer();
|
||||
nmClient.increaseContainerResource(container);
|
||||
if (!autoUpdate) {
|
||||
nmClient.increaseContainerResource(container);
|
||||
}
|
||||
// NodeManager may still need some time to get the stable
|
||||
// container status
|
||||
while (true) {
|
||||
|
|
Loading…
Reference in New Issue