YARN-5978. ContainerScheduler and ContainerManager changes to support ExecType update. (Kartheek Muthyala via asuresh)

This commit is contained in:
Arun Suresh 2017-08-14 19:46:17 -07:00
parent 0446511398
commit 4d7be1d857
17 changed files with 963 additions and 227 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -36,6 +37,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@ -142,6 +144,10 @@ private void createClusterAndStartApplication() throws Exception {
// set the minimum allocation so that resource decrease can go under 1024
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
@ -924,8 +930,8 @@ public void testAskWithNodeLabels() {
// add exp=x to ANY
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "x"));
Assert.assertEquals(1, client.ask.size());
Assert.assertEquals("x", client.ask.iterator().next()
assertEquals(1, client.ask.size());
assertEquals("x", client.ask.iterator().next()
.getNodeLabelExpression());
// add exp=x then add exp=a to ANY in same priority, only exp=a should kept
@ -933,8 +939,8 @@ public void testAskWithNodeLabels() {
1), null, null, Priority.UNDEFINED, true, "x"));
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "a"));
Assert.assertEquals(1, client.ask.size());
Assert.assertEquals("a", client.ask.iterator().next()
assertEquals(1, client.ask.size());
assertEquals("a", client.ask.iterator().next()
.getNodeLabelExpression());
// add exp=x to ANY, rack and node, only resource request has ANY resource
@ -943,10 +949,10 @@ public void testAskWithNodeLabels() {
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true,
"y"));
Assert.assertEquals(1, client.ask.size());
assertEquals(1, client.ask.size());
for (ResourceRequest req : client.ask) {
if (ResourceRequest.ANY.equals(req.getResourceName())) {
Assert.assertEquals("y", req.getNodeLabelExpression());
assertEquals("y", req.getNodeLabelExpression());
} else {
Assert.assertNull(req.getNodeLabelExpression());
}
@ -957,7 +963,7 @@ public void testAskWithNodeLabels() {
new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y"));
for (ResourceRequest req : client.ask) {
if (ResourceRequest.ANY.equals(req.getResourceName())) {
Assert.assertEquals("y", req.getNodeLabelExpression());
assertEquals("y", req.getNodeLabelExpression());
} else {
Assert.assertNull(req.getNodeLabelExpression());
}
@ -971,7 +977,7 @@ private void verifyAddRequestFailed(AMRMClient<ContainerRequest> client,
} catch (InvalidContainerRequestException e) {
return;
}
Assert.fail();
fail();
}
@Test(timeout=30000)
@ -1042,7 +1048,8 @@ private List<Container> allocateAndStartContainers(
// get allocations
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<Container> containers = allocResponse.getAllocatedContainers();
Assert.assertEquals(num, containers.size());
assertEquals(num, containers.size());
// build container launch context
Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
@ -1083,14 +1090,14 @@ private List<Container> allocateAndStartContainers(
private void doContainerResourceChange(
final AMRMClient<ContainerRequest> amClient, List<Container> containers)
throws YarnException, IOException {
Assert.assertEquals(3, containers.size());
assertEquals(3, containers.size());
// remember the container IDs
Container container1 = containers.get(0);
Container container2 = containers.get(1);
Container container3 = containers.get(2);
AMRMClientImpl<ContainerRequest> amClientImpl =
(AMRMClientImpl<ContainerRequest>) amClient;
Assert.assertEquals(0, amClientImpl.change.size());
assertEquals(0, amClientImpl.change.size());
// verify newer request overwrites older request for the container1
amClientImpl.requestContainerUpdate(container1,
UpdateContainerRequest.newInstance(container1.getVersion(),
@ -1100,21 +1107,21 @@ private void doContainerResourceChange(
UpdateContainerRequest.newInstance(container1.getVersion(),
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(4096, 1), null));
Assert.assertEquals(Resource.newInstance(4096, 1),
assertEquals(Resource.newInstance(4096, 1),
amClientImpl.change.get(container1.getId()).getValue().getCapability());
// verify new decrease request cancels old increase request for container1
amClientImpl.requestContainerUpdate(container1,
UpdateContainerRequest.newInstance(container1.getVersion(),
container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
Resource.newInstance(512, 1), null));
Assert.assertEquals(Resource.newInstance(512, 1),
assertEquals(Resource.newInstance(512, 1),
amClientImpl.change.get(container1.getId()).getValue().getCapability());
// request resource increase for container2
amClientImpl.requestContainerUpdate(container2,
UpdateContainerRequest.newInstance(container2.getVersion(),
container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
Assert.assertEquals(Resource.newInstance(2048, 1),
assertEquals(Resource.newInstance(2048, 1),
amClientImpl.change.get(container2.getId()).getValue().getCapability());
// verify release request will cancel pending change requests for the same
// container
@ -1122,27 +1129,357 @@ private void doContainerResourceChange(
UpdateContainerRequest.newInstance(container3.getVersion(),
container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
Assert.assertEquals(3, amClientImpl.pendingChange.size());
assertEquals(3, amClientImpl.pendingChange.size());
amClientImpl.releaseAssignedContainer(container3.getId());
Assert.assertEquals(2, amClientImpl.pendingChange.size());
assertEquals(2, amClientImpl.pendingChange.size());
// as of now: container1 asks to decrease to (512, 1)
// container2 asks to increase to (2048, 1)
// send allocation requests
AllocateResponse allocResponse = amClient.allocate(0.1f);
Assert.assertEquals(0, amClientImpl.change.size());
assertEquals(0, amClientImpl.change.size());
// we should get decrease confirmation right away
List<UpdatedContainer> updatedContainers =
allocResponse.getUpdatedContainers();
Assert.assertEquals(1, updatedContainers.size());
assertEquals(1, updatedContainers.size());
// we should get increase allocation after the next NM's heartbeat to RM
triggerSchedulingWithNMHeartBeat();
// get allocations
allocResponse = amClient.allocate(0.1f);
updatedContainers =
allocResponse.getUpdatedContainers();
Assert.assertEquals(1, updatedContainers.size());
assertEquals(1, updatedContainers.size());
}
@Test(timeout=60000)
public void testAMRMClientWithContainerPromotion()
throws YarnException, IOException {
AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
(AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
.createAMRMClient();
//asserting we are not using the singleton instance cache
Assert.assertSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
amClient.init(conf);
amClient.start();
// start am nm client
NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
Assert.assertNotNull(nmClient);
// asserting we are using the singleton instance cache
Assert.assertSame(
NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
nmClient.init(conf);
nmClient.start();
assertEquals(STATE.STARTED, nmClient.getServiceState());
amClient.registerApplicationMaster("Host", 10000, "");
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
// START OPPORTUNISTIC Container, Send allocation request to RM
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null, ExecutionTypeRequest
.newInstance(ExecutionType.OPPORTUNISTIC, true)));
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(1, oppContainersRequestedAny);
assertEquals(1, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
int iterationsLeft = 50;
amClient.getNMTokenCache().clearCache();
assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
AllocateResponse allocResponse = null;
while (allocatedContainerCount < oppContainersRequestedAny
&& iterationsLeft-- > 0) {
allocResponse = amClient.allocate(0.1f);
// let NM heartbeat to RM and trigger allocations
//triggerSchedulingWithNMHeartBeat();
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount +=
allocResponse.getAllocatedContainers().size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedOpportContainers.put(container.getId(), container);
}
}
if (allocatedContainerCount < oppContainersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(oppContainersRequestedAny, allocatedContainerCount);
assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
startContainer(allocResponse, nmClient);
// SEND PROMOTION REQUEST TO RM
try {
Container c = allocatedOpportContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.OPPORTUNISTIC));
fail("Should throw Exception..");
} catch (IllegalArgumentException e) {
System.out.println("## " + e.getMessage());
assertTrue(e.getMessage().contains(
"target should be GUARANTEED and original should be OPPORTUNISTIC"));
}
Container c = allocatedOpportContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED));
iterationsLeft = 120;
Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
// do a few iterations to ensure RM is not going to send new containers
while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
// inform RM of rejection
allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
if (allocResponse.getUpdatedContainers() != null) {
for (UpdatedContainer updatedContainer : allocResponse
.getUpdatedContainers()) {
System.out.println("Got update..");
updatedContainers.put(updatedContainer.getContainer().getId(),
updatedContainer);
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
assertEquals(1, updatedContainers.size());
for (ContainerId cId : allocatedOpportContainers.keySet()) {
Container orig = allocatedOpportContainers.get(cId);
UpdatedContainer updatedContainer = updatedContainers.get(cId);
assertNotNull(updatedContainer);
assertEquals(ExecutionType.GUARANTEED,
updatedContainer.getContainer().getExecutionType());
assertEquals(orig.getResource(),
updatedContainer.getContainer().getResource());
assertEquals(orig.getNodeId(),
updatedContainer.getContainer().getNodeId());
assertEquals(orig.getVersion() + 1,
updatedContainer.getContainer().getVersion());
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
// SEND UPDATE EXECTYPE UPDATE TO NM
updateContainerExecType(allocResponse, ExecutionType.GUARANTEED, nmClient);
amClient.ask.clear();
}
@Test(timeout=60000)
public void testAMRMClientWithContainerDemotion()
throws YarnException, IOException {
AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
(AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
.createAMRMClient();
//asserting we are not using the singleton instance cache
Assert.assertSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
amClient.init(conf);
amClient.start();
NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
Assert.assertNotNull(nmClient);
// asserting we are using the singleton instance cache
Assert.assertSame(
NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
nmClient.init(conf);
nmClient.start();
assertEquals(STATE.STARTED, nmClient.getServiceState());
amClient.registerApplicationMaster("Host", 10000, "");
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
// START OPPORTUNISTIC Container, Send allocation request to RM
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null, ExecutionTypeRequest
.newInstance(ExecutionType.GUARANTEED, true)));
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
assertEquals(1, oppContainersRequestedAny);
assertEquals(1, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
Map<ContainerId, Container> allocatedGuaranteedContainers = new HashMap<>();
int iterationsLeft = 50;
amClient.getNMTokenCache().clearCache();
assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
AllocateResponse allocResponse = null;
while (allocatedContainerCount < oppContainersRequestedAny
&& iterationsLeft-- > 0) {
allocResponse = amClient.allocate(0.1f);
// let NM heartbeat to RM and trigger allocations
//triggerSchedulingWithNMHeartBeat();
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount +=
allocResponse.getAllocatedContainers().size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.GUARANTEED) {
allocatedGuaranteedContainers.put(container.getId(), container);
}
}
if (allocatedContainerCount < oppContainersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(oppContainersRequestedAny, allocatedContainerCount);
assertEquals(oppContainersRequestedAny,
allocatedGuaranteedContainers.size());
startContainer(allocResponse, nmClient);
// SEND DEMOTION REQUEST TO RM
try {
Container c = allocatedGuaranteedContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED));
fail("Should throw Exception..");
} catch (IllegalArgumentException e) {
System.out.println("## " + e.getMessage());
assertTrue(e.getMessage().contains(
"target should be OPPORTUNISTIC and original should be GUARANTEED"));
}
Container c = allocatedGuaranteedContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
null, ExecutionType.OPPORTUNISTIC));
iterationsLeft = 120;
Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
// do a few iterations to ensure RM is not going to send new containers
while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
// inform RM of rejection
allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
if (allocResponse.getUpdatedContainers() != null) {
for (UpdatedContainer updatedContainer : allocResponse
.getUpdatedContainers()) {
System.out.println("Got update..");
updatedContainers.put(updatedContainer.getContainer().getId(),
updatedContainer);
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
assertEquals(1, updatedContainers.size());
for (ContainerId cId : allocatedGuaranteedContainers.keySet()) {
Container orig = allocatedGuaranteedContainers.get(cId);
UpdatedContainer updatedContainer = updatedContainers.get(cId);
assertNotNull(updatedContainer);
assertEquals(ExecutionType.OPPORTUNISTIC,
updatedContainer.getContainer().getExecutionType());
assertEquals(orig.getResource(),
updatedContainer.getContainer().getResource());
assertEquals(orig.getNodeId(),
updatedContainer.getContainer().getNodeId());
assertEquals(orig.getVersion() + 1,
updatedContainer.getContainer().getVersion());
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
updateContainerExecType(allocResponse, ExecutionType.OPPORTUNISTIC,
nmClient);
amClient.ask.clear();
}
private void updateContainerExecType(AllocateResponse allocResponse,
ExecutionType expectedExecType, NMClientImpl nmClient)
throws IOException, YarnException {
for (UpdatedContainer updatedContainer : allocResponse
.getUpdatedContainers()) {
Container container = updatedContainer.getContainer();
nmClient.increaseContainerResource(container);
// NodeManager may still need some time to get the stable
// container status
while (true) {
ContainerStatus status = nmClient
.getContainerStatus(container.getId(), container.getNodeId());
if (status.getExecutionType() == expectedExecType) {
break;
}
sleep(10);
}
}
}
private void startContainer(AllocateResponse allocResponse,
NMClientImpl nmClient) throws IOException, YarnException {
// START THE CONTAINER IN NM
// build container launch context
Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// start a process long enough for increase/decrease action to take effect
ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext(
Collections.<String, LocalResource>emptyMap(),
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
new HashMap<String, ByteBuffer>(), securityTokens,
new HashMap<ApplicationAccessType, String>());
// start the containers and make sure they are in RUNNING state
for (Container container : allocResponse.getAllocatedContainers()) {
nmClient.startContainer(container, clc);
// NodeManager may still need some time to get the stable
// container status
while (true) {
ContainerStatus status = nmClient
.getContainerStatus(container.getId(), container.getNodeId());
if (status.getState() == ContainerState.RUNNING) {
break;
}
sleep(10);
}
}
}
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
throws YarnException, IOException {
// setup container request
@ -1172,7 +1509,7 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
Set<ContainerId> releases = new TreeSet<ContainerId>();
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
while (allocatedContainerCount < containersRequestedAny
@ -1192,7 +1529,7 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
if (receivedNMTokens.containsKey(nodeID)) {
Assert.fail("Received token again for : " + nodeID);
fail("Received token again for : " + nodeID);
}
receivedNMTokens.put(nodeID, token.getToken());
}
@ -1204,7 +1541,7 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
}
// Should receive atleast 1 token
Assert.assertTrue(receivedNMTokens.size() > 0
assertTrue(receivedNMTokens.size() > 0
&& receivedNMTokens.size() <= nodeCount);
assertEquals(allocatedContainerCount, containersRequestedAny);
@ -1444,7 +1781,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 =
getAMRMToken();
Assert.assertNotNull(amrmToken_1);
Assert.assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
// Wait for enough time and make sure the roll_over happens
@ -1459,7 +1796,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 =
getAMRMToken();
Assert.assertNotNull(amrmToken_2);
Assert.assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
@ -1474,7 +1811,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
AMRMTokenIdentifierForTest newVersionTokenIdentifier =
new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier",
assertEquals("Message is changed after set to newVersionTokenIdentifier",
"message", newVersionTokenIdentifier.getMessage());
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
@ -1530,10 +1867,10 @@ public ApplicationMasterProtocol run() {
.getBindAddress(), conf);
}
}).allocate(Records.newRecord(AllocateRequest.class));
Assert.fail("The old Token should not work");
fail("The old Token should not work");
} catch (Exception ex) {
Assert.assertTrue(ex instanceof InvalidToken);
Assert.assertTrue(ex.getMessage().contains(
assertTrue(ex instanceof InvalidToken);
assertTrue(ex.getMessage().contains(
"Invalid AMRMToken from "
+ amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
}
@ -1560,7 +1897,7 @@ public ApplicationMasterProtocol run() {
org.apache.hadoop.security.token.Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
if (result != null) {
Assert.fail("credentials has more than one AMRM token."
fail("credentials has more than one AMRM token."
+ " token1: " + result + " token2: " + token);
}
result = (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>)

View File

@ -301,7 +301,6 @@ private void testContainerManagement(NMClientImpl nmClient,
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
// increaseContainerResource shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
@ -475,10 +474,10 @@ private void testIncreaseContainerResource(Container container)
try {
nmClient.increaseContainerResource(container);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
// NM container increase container resource should fail without a version
// increase action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
container.getId() + " has update version ")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@ -136,13 +137,14 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@ -410,8 +412,24 @@ private void recoverContainer(RecoveredContainerState rcs)
throws IOException {
StartContainerRequest req = rcs.getStartRequest();
ContainerLaunchContext launchContext = req.getContainerLaunchContext();
ContainerTokenIdentifier token =
ContainerTokenIdentifier token = null;
if(rcs.getCapability() != null) {
ContainerTokenIdentifier originalToken =
BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
token = new ContainerTokenIdentifier(originalToken.getContainerID(),
originalToken.getVersion(), originalToken.getNmHostAddress(),
originalToken.getApplicationSubmitter(), rcs.getCapability(),
originalToken.getExpiryTimeStamp(), originalToken.getMasterKeyId(),
originalToken.getRMIdentifier(), originalToken.getPriority(),
originalToken.getCreationTime(),
originalToken.getLogAggregationContext(),
originalToken.getNodeLabelExpression(),
originalToken.getContainerType(), originalToken.getExecutionType());
} else {
token = BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
}
ContainerId containerId = token.getContainerID();
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
@ -1183,9 +1201,7 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
// as container resource increase request will have come with
// an updated NMToken.
updateNMTokenIdentifier(nmTokenIdentifier);
Resource resource = containerTokenIdentifier.getResource();
changeContainerResourceInternal(containerId,
containerTokenIdentifier.getVersion(), resource, true);
updateContainerInternal(containerId, containerTokenIdentifier);
successfullyUpdatedContainers.add(containerId);
} catch (YarnException | InvalidToken e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
@ -1199,8 +1215,8 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
}
@SuppressWarnings("unchecked")
private void changeContainerResourceInternal(ContainerId containerId,
int containerVersion, Resource targetResource, boolean increase)
private void updateContainerInternal(ContainerId containerId,
ContainerTokenIdentifier containerTokenIdentifier)
throws YarnException, IOException {
Container container = context.getContainers().get(containerId);
// Check container existence
@ -1213,64 +1229,77 @@ private void changeContainerResourceInternal(ContainerId containerId,
+ " is not handled by this NodeManager");
}
}
// Check container version.
int currentVersion = container.getContainerTokenIdentifier().getVersion();
if (containerTokenIdentifier.getVersion() <= currentVersion) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " has update version [" + currentVersion + "] >= requested version"
+ " [" + containerTokenIdentifier.getVersion() + "]");
}
// Check container state
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState currentState =
container.getContainerState();
if (currentState != org.apache.hadoop.yarn.server.
nodemanager.containermanager.container.ContainerState.RUNNING) {
nodemanager.containermanager.container.ContainerState.RUNNING &&
currentState != org.apache.hadoop.yarn.server.
nodemanager.containermanager.container.ContainerState.SCHEDULED) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is in " + currentState.name() + " state."
+ " Resource can only be changed when a container is in"
+ " RUNNING state");
+ " RUNNING or SCHEDULED state");
}
// Check validity of the target resource.
Resource currentResource = container.getResource();
if (currentResource.equals(targetResource)) {
LOG.warn("Unable to change resource for container "
+ containerId.toString()
+ ". The target resource "
+ targetResource.toString()
+ " is the same as the current resource");
return;
ExecutionType currentExecType =
container.getContainerTokenIdentifier().getExecutionType();
boolean isResourceChange = false;
boolean isExecTypeUpdate = false;
Resource targetResource = containerTokenIdentifier.getResource();
ExecutionType targetExecType = containerTokenIdentifier.getExecutionType();
// Is true if either the resources has increased or execution type
// updated from opportunistic to guaranteed
boolean isIncrease = false;
if (!currentResource.equals(targetResource)) {
isResourceChange = true;
isIncrease = Resources.fitsIn(currentResource, targetResource)
&& !Resources.fitsIn(targetResource, currentResource);
} else if (!currentExecType.equals(targetExecType)) {
isExecTypeUpdate = true;
isIncrease = currentExecType == ExecutionType.OPPORTUNISTIC &&
targetExecType == ExecutionType.GUARANTEED;
}
if (increase && !Resources.fitsIn(currentResource, targetResource)) {
throw RPCUtil.getRemoteException("Unable to increase resource for "
+ "container " + containerId.toString()
+ ". The target resource "
+ targetResource.toString()
+ " is smaller than the current resource "
+ currentResource.toString());
}
if (!increase &&
(!Resources.fitsIn(Resources.none(), targetResource)
|| !Resources.fitsIn(targetResource, currentResource))) {
throw RPCUtil.getRemoteException("Unable to decrease resource for "
+ "container " + containerId.toString()
+ ". The target resource "
+ targetResource.toString()
+ " is not smaller than the current resource "
+ currentResource.toString());
}
if (increase) {
org.apache.hadoop.yarn.api.records.Container increasedContainer =
if (isIncrease) {
org.apache.hadoop.yarn.api.records.Container increasedContainer = null;
if (isResourceChange) {
increasedContainer =
org.apache.hadoop.yarn.api.records.Container.newInstance(
containerId, null, null, targetResource, null, null);
containerId, null, null, targetResource, null, null,
currentExecType);
} else {
increasedContainer =
org.apache.hadoop.yarn.api.records.Container.newInstance(
containerId, null, null, currentResource, null, null,
targetExecType);
}
if (context.getIncreasedContainers().putIfAbsent(containerId,
increasedContainer) != null){
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " resource is being increased.");
+ " resource is being increased -or- " +
"is undergoing ExecutionType promoted.");
}
}
this.readLock.lock();
try {
if (!serviceStopped) {
// Persist container resource change for recovery
this.context.getNMStateStore().storeContainerResourceChanged(
containerId, containerVersion, targetResource);
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(
containerId, targetResource));
// Dispatch message to ContainerScheduler to actually
// make the change.
dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent(
container, containerTokenIdentifier, isResourceChange,
isExecTypeUpdate, isIncrease));
} else {
throw new YarnException(
"Unable to change container resource as the NodeManager is "
@ -1571,8 +1600,11 @@ public void handle(ContainerManagerEvent event) {
for (org.apache.hadoop.yarn.api.records.Container container
: containersDecreasedEvent.getContainersToDecrease()) {
try {
changeContainerResourceInternal(container.getId(),
container.getVersion(), container.getResource(), false);
ContainerTokenIdentifier containerTokenIdentifier =
BuilderUtils.newContainerTokenIdentifier(
container.getContainerToken());
updateContainerInternal(container.getId(),
containerTokenIdentifier);
} catch (YarnException e) {
LOG.error("Unable to decrease container resource", e);
} catch (IOException e) {

View File

@ -39,10 +39,10 @@ public interface Container extends EventHandler<ContainerEvent> {
Resource getResource();
void setResource(Resource targetResource);
ContainerTokenIdentifier getContainerTokenIdentifier();
void setContainerTokenIdentifier(ContainerTokenIdentifier token);
String getUser();
ContainerState getContainerState();

View File

@ -148,9 +148,8 @@ private ReInitializationContext createContextForRollback() {
private final Credentials credentials;
private final NodeManagerMetrics metrics;
private volatile ContainerLaunchContext launchContext;
private final ContainerTokenIdentifier containerTokenIdentifier;
private volatile ContainerTokenIdentifier containerTokenIdentifier;
private final ContainerId containerId;
private volatile Resource resource;
private final String user;
private int version;
private int exitCode = ContainerExitStatus.INVALID;
@ -201,7 +200,6 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
this.resource = containerTokenIdentifier.getResource();
this.diagnostics = new StringBuilder();
this.credentials = creds;
this.metrics = metrics;
@ -269,13 +267,6 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.exitCode = rcs.getExitCode();
this.recoveredAsKilled = rcs.getKilled();
this.diagnostics.append(rcs.getDiagnostics());
Resource recoveredCapability = rcs.getCapability();
if (recoveredCapability != null
&& !this.resource.equals(recoveredCapability)) {
// resource capability had been updated before NM was down
this.resource = Resource.newInstance(recoveredCapability.getMemorySize(),
recoveredCapability.getVirtualCores());
}
this.version = rcs.getVersion();
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
@ -640,14 +631,8 @@ public ContainerId getContainerId() {
@Override
public Resource getResource() {
return Resources.clone(this.resource);
}
@Override
public void setResource(Resource targetResource) {
Resource currentResource = getResource();
this.resource = Resources.clone(targetResource);
this.metrics.changeContainer(currentResource, targetResource);
return Resources.clone(
this.containerTokenIdentifier.getResource());
}
@Override
@ -660,6 +645,16 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() {
}
}
@Override
public void setContainerTokenIdentifier(ContainerTokenIdentifier token) {
this.writeLock.lock();
try {
this.containerTokenIdentifier = token;
} finally {
this.writeLock.unlock();
}
}
@Override
public String getWorkDir() {
return workDir;
@ -833,7 +828,8 @@ public ContainerState transition(ContainerImpl container,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
container.metrics.releaseContainer(container.resource);
container.metrics.releaseContainer(
container.containerTokenIdentifier.getResource());
container.sendFinishedEvents();
return ContainerState.DONE;
}
@ -1517,7 +1513,8 @@ static class ContainerDoneTransition implements
@Override
@SuppressWarnings("unchecked")
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.releaseContainer(container.resource);
container.metrics.releaseContainer(
container.containerTokenIdentifier.getResource());
if (container.containerMetrics != null) {
container.containerMetrics
.recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);

View File

@ -741,19 +741,6 @@ private String formatUsageString(long currentVmemUsage, long vmemLimit,
}
}
private void changeContainerResource(
ContainerId containerId, Resource resource) {
Container container = context.getContainers().get(containerId);
// Check container existence
if (container == null) {
LOG.warn("Container " + containerId.toString() + "does not exist");
return;
}
// YARN-5860: Route this through the ContainerScheduler to
// fix containerAllocation
container.setResource(resource);
}
private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
if (!containerMetricsEnabled || monitoringEvent == null) {
return;
@ -902,8 +889,6 @@ private void onChangeMonitoringContainerResource(
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
}
changeContainerResource(containerId, changeEvent.getResource());
}
private void onStopMonitoringContainer(

View File

@ -31,6 +31,9 @@
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@ -136,6 +139,13 @@ public void handle(ContainerSchedulerEvent event) {
case CONTAINER_COMPLETED:
onContainerCompleted(event.getContainer());
break;
case UPDATE_CONTAINER:
if (event instanceof UpdateContainerSchedulerEvent) {
onUpdateContainer((UpdateContainerSchedulerEvent) event);
} else {
LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
}
break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
@ -145,6 +155,69 @@ public void handle(ContainerSchedulerEvent event) {
}
}
/**
* We assume that the ContainerManager has already figured out what kind
* of update this is.
*/
private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
ContainerId containerId = updateEvent.getContainer().getContainerId();
if (updateEvent.isResourceChange()) {
if (runningContainers.containsKey(containerId)) {
this.utilizationTracker.subtractContainerResource(
updateEvent.getContainer());
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
this.utilizationTracker.addContainerResources(
updateEvent.getContainer());
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(containerId,
updateEvent.getUpdatedToken().getResource()));
} else {
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
}
try {
// Persist change in the state store.
this.context.getNMStateStore().storeContainerResourceChanged(
containerId,
updateEvent.getUpdatedToken().getVersion(),
updateEvent.getUpdatedToken().getResource());
} catch (IOException e) {
LOG.warn("Could not store container [" + containerId + "] resource " +
"change..", e);
}
}
if (updateEvent.isExecTypeUpdate()) {
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
// If this is a running container.. just change the execution type
// and be done with it.
if (!runningContainers.containsKey(containerId)) {
// Promotion or not (Increase signifies either a promotion
// or container size increase)
if (updateEvent.isIncrease()) {
// Promotion of queued container..
if (queuedOpportunisticContainers.remove(containerId) != null) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
}
//Kill opportunistic containers if any to make room for
// promotion request
killOpportunisticContainers(updateEvent.getContainer());
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
// containers
if (queuedGuaranteedContainers.remove(containerId) != null) {
queuedOpportunisticContainers.put(containerId,
updateEvent.getContainer());
}
}
}
}
}
/**
* Return number of queued containers.
* @return Number of queued containers.

View File

@ -24,6 +24,7 @@
public enum ContainerSchedulerEventType {
SCHEDULE_CONTAINER,
CONTAINER_COMPLETED,
UPDATE_CONTAINER,
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
/**
* Update Event consumed by the {@link ContainerScheduler}.
*/
public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
private ContainerTokenIdentifier updatedToken;
private boolean isResourceChange;
private boolean isExecTypeUpdate;
private boolean isIncrease;
/**
* Create instance of Event.
*
* @param originalContainer Original Container.
* @param updatedToken Updated Container Token.
* @param isResourceChange is this a Resource Change.
* @param isExecTypeUpdate is this an ExecTypeUpdate.
* @param isIncrease is this a Container Increase.
*/
public UpdateContainerSchedulerEvent(Container originalContainer,
ContainerTokenIdentifier updatedToken, boolean isResourceChange,
boolean isExecTypeUpdate, boolean isIncrease) {
super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER);
this.updatedToken = updatedToken;
this.isResourceChange = isResourceChange;
this.isExecTypeUpdate = isExecTypeUpdate;
this.isIncrease = isIncrease;
}
/**
* Update Container Token.
*
* @return Container Token.
*/
public ContainerTokenIdentifier getUpdatedToken() {
return updatedToken;
}
/**
* isResourceChange.
* @return isResourceChange.
*/
public boolean isResourceChange() {
return isResourceChange;
}
/**
* isExecTypeUpdate.
* @return isExecTypeUpdate.
*/
public boolean isExecTypeUpdate() {
return isExecTypeUpdate;
}
/**
* isIncrease.
* @return isIncrease.
*/
public boolean isIncrease() {
return isIncrease;
}
}

View File

@ -682,7 +682,7 @@ public void run() {
try{
try {
updateBarrier.await();
increaseTokens.add(getContainerToken(targetResource));
increaseTokens.add(getContainerToken(targetResource, 1));
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse updateResponse =
@ -710,6 +710,15 @@ private Token getContainerToken(Resource resource) throws IOException {
getNMContext().getNodeId(), user, resource,
getNMContext().getContainerTokenSecretManager(), null);
}
private Token getContainerToken(Resource resource, int version)
throws IOException {
ContainerId cId = TestContainerManager.createContainerId(0);
return TestContainerManager.createContainerToken(
cId, version, DUMMY_RM_IDENTIFIER,
getNMContext().getNodeId(), user, resource,
getNMContext().getContainerTokenSecretManager(), null);
}
}
public static NMContainerStatus createNMContainerStatus(int id,

View File

@ -421,6 +421,20 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenIdentifier);
}
public static Token createContainerToken(ContainerId cId, int version,
long rmIdentifier, NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext) throws IOException {
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, version, nodeId.toString(), user,
resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null,
ContainerType.TASK, ExecutionType.GUARANTEED);
return BuilderUtils.newContainerToken(nodeId,
containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
@ -431,8 +445,23 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null,
ContainerType.TASK, executionType);
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
.retrievePassword(containerTokenIdentifier),
return BuilderUtils.newContainerToken(nodeId,
containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
public static Token createContainerToken(ContainerId cId, int version,
long rmIdentifier, NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext, ExecutionType executionType)
throws IOException {
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, version, nodeId.toString(), user,
resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null,
ContainerType.TASK, executionType);
return BuilderUtils.newContainerToken(nodeId,
containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@ -70,6 +71,7 @@
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -80,14 +82,15 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@ -100,6 +103,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@ -117,9 +121,33 @@ public TestContainerManager() throws UnsupportedFileSystemException {
LOG = LogFactory.getLog(TestContainerManager.class);
}
private boolean delayContainers = false;
@Override
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
if (delayContainers) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// Nothing..
}
}
return super.launchContainer(ctx);
}
};
exec.setConf(conf);
return spy(exec);
}
@Override
@Before
public void setup() throws IOException {
conf.setInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
super.setup();
}
@ -1468,7 +1496,7 @@ public void testNullTokens() throws Exception {
Assert.assertEquals(strExceptionMsg,
ContainerManagerImpl.INVALID_NMTOKEN_MSG);
ContainerManagerImpl spyContainerMgr = Mockito.spy(cMgrImpl);
ContainerManagerImpl spyContainerMgr = spy(cMgrImpl);
UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a");
Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo);
Mockito.when(spyContainerMgr.
@ -1543,7 +1571,7 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
// container will have exited, and won't be in RUNNING state
ContainerId cId0 = createContainerId(0);
Token containerToken =
createContainerToken(cId0, DUMMY_RM_IDENTIFIER,
createContainerToken(cId0, 1, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user,
Resource.newInstance(1234, 3),
context.getContainerTokenSecretManager(), null);
@ -1572,7 +1600,7 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
if (cId0.equals(entry.getKey())) {
Assert.assertTrue(entry.getValue().getMessage()
.contains("Resource can only be changed when a "
+ "container is in RUNNING state"));
+ "container is in RUNNING or SCHEDULED state"));
} else if (cId7.equals(entry.getKey())) {
Assert.assertTrue(entry.getValue().getMessage()
.contains("Container " + cId7.toString()
@ -1584,89 +1612,6 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
}
}
@Test
public void testIncreaseContainerResourceWithInvalidResource() throws Exception {
containerManager.start();
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
// Construct the Container-id
ContainerId cId = createContainerId(0);
if (Shell.WINDOWS) {
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0");
fileWriter.write("\nexec sleep 100");
}
fileWriter.close();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resource_alpha =
URL.fromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
List<String> commands =
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
// Make sure the container reaches RUNNING state
BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<>();
// Add increase request. The increase request should fail
// as the current resource does not fit in the target resource
Token containerToken =
createContainerToken(cId, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user,
Resource.newInstance(512, 1),
context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse updateResponse =
containerManager.updateContainer(updateRequest);
// Check response
Assert.assertEquals(
0, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertEquals(1, updateResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
.getFailedRequests().entrySet()) {
if (cId.equals(entry.getKey())) {
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
Assert.assertTrue(entry.getValue().getMessage()
.contains("The target resource "
+ Resource.newInstance(512, 1).toString()
+ " is smaller than the current resource "
+ Resource.newInstance(1024, 1)));
} else {
throw new YarnException("Received failed request from wrong"
+ " container: " + entry.getKey().toString());
}
}
}
@Test
public void testChangeContainerResource() throws Exception {
containerManager.start();
@ -1720,7 +1665,7 @@ public void testChangeContainerResource() throws Exception {
List<Token> increaseTokens = new ArrayList<>();
// Add increase request.
Resource targetResource = Resource.newInstance(4096, 2);
Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
Token containerToken = createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, targetResource,
context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
@ -1741,15 +1686,19 @@ public void testChangeContainerResource() throws Exception {
// Check status immediately as resource increase is blocking
assertEquals(targetResource, containerStatus.getCapability());
// Simulate a decrease request
List<org.apache.hadoop.yarn.api.records.Container> containersToDecrease
= new ArrayList<>();
List<Token> decreaseTokens = new ArrayList<>();
targetResource = Resource.newInstance(2048, 2);
org.apache.hadoop.yarn.api.records.Container decreasedContainer =
org.apache.hadoop.yarn.api.records.Container
.newInstance(cId, null, null, targetResource, null, null);
containersToDecrease.add(decreasedContainer);
containerManager.handle(
new CMgrDecreaseContainersResourceEvent(containersToDecrease));
Token token = createContainerToken(cId, 2, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, targetResource,
context.getContainerTokenSecretManager(), null);
decreaseTokens.add(token);
updateRequest = ContainerUpdateRequest.newInstance(decreaseTokens);
updateResponse = containerManager.updateContainer(updateRequest);
Assert.assertEquals(
1, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
// Check status with retry
containerStatus = containerManager
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
@ -1879,7 +1828,7 @@ public void testStartContainerFailureWithInvalidLocalResource()
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerLaunchContext spyContainerLaunchContext =
Mockito.spy(containerLaunchContext);
spy(containerLaunchContext);
Mockito.when(spyContainerLaunchContext.getLocalResources())
.thenReturn(localResources);
@ -1924,7 +1873,7 @@ public void testStartContainerFailureWithNullTypeLocalResource()
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerLaunchContext spyContainerLaunchContext =
Mockito.spy(containerLaunchContext);
spy(containerLaunchContext);
Mockito.when(spyContainerLaunchContext.getLocalResources())
.thenReturn(localResources);
@ -1969,7 +1918,7 @@ public void testStartContainerFailureWithNullVisibilityLocalResource()
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerLaunchContext spyContainerLaunchContext =
Mockito.spy(containerLaunchContext);
spy(containerLaunchContext);
Mockito.when(spyContainerLaunchContext.getLocalResources())
.thenReturn(localResources);
@ -1996,4 +1945,122 @@ public void testStartContainerFailureWithNullVisibilityLocalResource()
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
.contains("Null resource visibility for local resource"));
}
@Test
public void testContainerUpdateExecTypeOpportunisticToGuaranteed()
throws IOException, YarnException, InterruptedException {
delayContainers = true;
containerManager.start();
// Construct the Container-id
ContainerId cId = createContainerId(0);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC));
List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
// Make sure the container reaches RUNNING state
BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request,
List<Token> updateTokens = new ArrayList<>();
Token containerToken =
createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED);
updateTokens.add(containerToken);
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(updateTokens);
ContainerUpdateResponse updateResponse =
containerManager.updateContainer(updateRequest);
Assert.assertEquals(
1, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
//Make sure the container is running
List<ContainerId> statList = new ArrayList<ContainerId>();
statList.add(cId);
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
Assert.assertEquals(1, containerStatuses.size());
for (ContainerStatus status : containerStatuses) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
Assert.assertEquals(ExecutionType.GUARANTEED, status.getExecutionType());
}
}
@Test
public void testContainerUpdateExecTypeGuaranteedToOpportunistic()
throws IOException, YarnException, InterruptedException {
delayContainers = true;
containerManager.start();
// Construct the Container-id
ContainerId cId = createContainerId(0);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null));
List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
// Make sure the container reaches RUNNING state
BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request,
List<Token> updateTokens = new ArrayList<>();
Token containerToken =
createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC);
updateTokens.add(containerToken);
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(updateTokens);
ContainerUpdateResponse updateResponse =
containerManager.updateContainer(updateRequest);
Assert.assertEquals(
1, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
//Make sure the container is running
List<ContainerId> statList = new ArrayList<ContainerId>();
statList.add(cId);
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
Assert.assertEquals(1, containerStatuses.size());
for (ContainerStatus status : containerStatuses) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
Assert
.assertEquals(ExecutionType.OPPORTUNISTIC, status.getExecutionType());
}
}
}

View File

@ -652,7 +652,7 @@ private ContainerUpdateResponse updateContainers(
final List<Token> increaseTokens = new ArrayList<Token>();
// add increase request
Token containerToken = TestContainerManager.createContainerToken(
cid, 0, context.getNodeId(), user.getShortUserName(),
cid, 1, 0, context.getNodeId(), user.getShortUserName(),
capability, context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
final ContainerUpdateRequest updateRequest =

View File

@ -27,6 +27,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@ -37,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -951,4 +954,97 @@ public void testStopQueuedContainer() throws Exception {
map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED)
.getContainerId());
}
/**
* Starts one OPPORTUNISTIC container that takes up the whole node's
* resources, and submit one more that will be queued. Now promote the
* queued OPPORTUNISTIC container, which should kill the current running
* OPPORTUNISTIC container to make room for the promoted request.
* @throws Exception
*/
@Test
public void testPromotionOfOpportunisticContainers() throws Exception {
containerManager.start();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
List<StartContainerRequest> list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(1024, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
Thread.sleep(5000);
// Ensure first container is running and others are queued.
List<ContainerId> statList = new ArrayList<ContainerId>();
for (int i = 0; i < 3; i++) {
statList.add(createContainerId(i));
}
GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
.newInstance(Arrays.asList(createContainerId(0)));
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
} else {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
}
}
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
// Ensure two containers are properly queued.
Assert.assertEquals(1, containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
containerScheduler.getNumQueuedGuaranteedContainers());
Assert.assertEquals(1,
containerScheduler.getNumQueuedOpportunisticContainers());
// Promote Queued Opportunistic Container
Token updateToken =
createContainerToken(createContainerId(1), 1, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, BuilderUtils.newResource(1024, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED);
List<Token> updateTokens = new ArrayList<Token>();
updateTokens.add(updateToken);
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(updateTokens);
ContainerUpdateResponse updateResponse =
containerManager.updateContainer(updateRequest);
Assert.assertEquals(1,
updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertEquals(0, updateResponse.getFailedRequests().size());
waitForContainerState(containerManager, createContainerId(0),
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE);
waitForContainerState(containerManager, createContainerId(1),
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
// Ensure no containers are queued.
Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
}
}

View File

@ -140,7 +140,7 @@ public Resource getResource() {
}
@Override
public void setResource(Resource targetResource) {
public void setContainerTokenIdentifier(ContainerTokenIdentifier token) {
}
@Override

View File

@ -655,7 +655,7 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
container.getNodeId(), getUser(), container.getResource(),
container.getPriority(), rmContainer.getCreationTime(),
this.logAggregationContext, rmContainer.getNodeLabelExpression(),
containerType));
containerType, container.getExecutionType()));
updateNMToken(container);
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.

View File

@ -186,6 +186,31 @@ public Token createContainerToken(ContainerId containerId,
null, null, ContainerType.TASK);
}
/**
* Helper function for creating ContainerTokens.
*
* @param containerId containerId.
* @param containerVersion containerVersion.
* @param nodeId nodeId.
* @param appSubmitter appSubmitter.
* @param capability capability.
* @param priority priority.
* @param createTime createTime.
* @param logAggregationContext logAggregationContext.
* @param nodeLabelExpression nodeLabelExpression.
* @param containerType containerType.
* @return the container-token.
*/
public Token createContainerToken(ContainerId containerId,
int containerVersion, NodeId nodeId, String appSubmitter,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType) {
return createContainerToken(containerId, containerVersion, nodeId,
appSubmitter, capability, priority, createTime, null, null,
ContainerType.TASK, ExecutionType.GUARANTEED);
}
/**
* Helper function for creating ContainerTokens
*
@ -199,13 +224,14 @@ public Token createContainerToken(ContainerId containerId,
* @param logAggregationContext Log Aggregation Context
* @param nodeLabelExpression Node Label Expression
* @param containerType Container Type
* @param execType Execution Type
* @return the container-token
*/
public Token createContainerToken(ContainerId containerId,
int containerVersion, NodeId nodeId, String appSubmitter,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType) {
ContainerType containerType, ExecutionType execType) {
byte[] password;
ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp =
@ -220,7 +246,7 @@ public Token createContainerToken(ContainerId containerId,
this.currentMasterKey.getMasterKey().getKeyId(),
ResourceManager.getClusterTimeStamp(), priority, createTime,
logAggregationContext, nodeLabelExpression, containerType,
ExecutionType.GUARANTEED);
execType);
password = this.createPassword(tokenIdentifier);
} finally {