YARN-1417. Modified RM to generate container-tokens not at creation time, but at allocation time so as to prevent RM
from shelling out containers with expired tokens. Contributed by Omkar Vinit Joshi and Jian He. svn merge --ignore-ancestry -c 1568060 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1568061 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
af2ad815cd
commit
d951cbaa47
@ -254,6 +254,10 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1578. Fixed reading incomplete application attempt and container data
|
||||
in FileSystemApplicationHistoryStore. (Shinichi Yamashita via zjshen)
|
||||
|
||||
YARN-1417. Modified RM to generate container-tokens not at creation time, but
|
||||
at allocation time so as to prevent RM from shelling out containers with
|
||||
expired tokens. (Omkar Vinit Joshi and Jian He via vinodkv)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -345,6 +345,11 @@ public synchronized List<Container> pullNewlyAllocatedContainers() {
|
||||
for (RMContainer rmContainer : newlyAllocatedContainers) {
|
||||
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
|
||||
RMContainerEventType.ACQUIRED));
|
||||
Container container = rmContainer.getContainer();
|
||||
rmContainer.getContainer().setContainerToken(
|
||||
rmContext.getContainerTokenSecretManager().createContainerToken(
|
||||
rmContainer.getContainerId(), container.getNodeId(), getUser(),
|
||||
container.getResource()));
|
||||
returnContainerList.add(rmContainer.getContainer());
|
||||
}
|
||||
newlyAllocatedContainers.clear();
|
||||
|
@ -1292,16 +1292,6 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create <code>ContainerToken</code>, only in secure-mode
|
||||
*/
|
||||
Token createContainerToken(
|
||||
FiCaSchedulerApp application, Container container) {
|
||||
return containerTokenSecretManager.createContainerToken(
|
||||
container.getId(), container.getNodeId(),
|
||||
application.getUser(), container.getResource());
|
||||
}
|
||||
|
||||
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
||||
FiCaSchedulerApp application, Priority priority,
|
||||
ResourceRequest request, NodeType type, RMContainer rmContainer) {
|
||||
@ -1345,14 +1335,6 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
|
||||
unreserve(application, priority, node, rmContainer);
|
||||
}
|
||||
|
||||
Token containerToken =
|
||||
createContainerToken(application, container);
|
||||
if (containerToken == null) {
|
||||
// Something went wrong...
|
||||
return Resources.none();
|
||||
}
|
||||
container.setContainerToken(containerToken);
|
||||
|
||||
// Inform the application
|
||||
RMContainer allocatedContainer =
|
||||
application.allocate(type, node, priority, request, container);
|
||||
|
@ -151,17 +151,11 @@ public Container createContainer(
|
||||
NodeId nodeId = node.getRMNode().getNodeID();
|
||||
ContainerId containerId = BuilderUtils.newContainerId(application
|
||||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
org.apache.hadoop.yarn.api.records.Token containerToken =
|
||||
containerTokenSecretManager.createContainerToken(containerId, nodeId,
|
||||
application.getUser(), capability);
|
||||
if (containerToken == null) {
|
||||
return null; // Try again later.
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, priority, containerToken);
|
||||
.getHttpAddress(), capability, priority, null);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
@ -654,20 +654,11 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application
|
||||
NodeId nodeId = node.getRMNode().getNodeID();
|
||||
ContainerId containerId = BuilderUtils.newContainerId(application
|
||||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
Token containerToken = null;
|
||||
|
||||
containerToken =
|
||||
this.rmContext.getContainerTokenSecretManager()
|
||||
.createContainerToken(containerId, nodeId, application.getUser(),
|
||||
capability);
|
||||
if (containerToken == null) {
|
||||
return i; // Try again later.
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, priority, containerToken);
|
||||
.getHttpAddress(), capability, priority, null);
|
||||
|
||||
// Allocate!
|
||||
|
||||
|
@ -142,8 +142,15 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
|
||||
public void waitForState(MockNM nm, ContainerId containerId,
|
||||
RMContainerState containerState) throws Exception {
|
||||
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
||||
Assert.assertNotNull("Container shouldn't be null", container);
|
||||
int timeoutSecs = 0;
|
||||
while(container == null && timeoutSecs++ < 20) {
|
||||
nm.nodeHeartbeat(true);
|
||||
container = getResourceScheduler().getRMContainer(containerId);
|
||||
System.out.println("Waiting for container " + containerId + " to be allocated.");
|
||||
Thread.sleep(100);
|
||||
}
|
||||
Assert.assertNotNull("Container shouldn't be null", container);
|
||||
timeoutSecs = 0;
|
||||
while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
|
||||
System.out.println("Container : " + containerId + " State is : "
|
||||
+ container.getState() + " Waiting for state : " + containerState);
|
||||
|
@ -18,11 +18,17 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
@ -30,6 +36,9 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
@ -106,4 +115,38 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception {
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
// This is to test container tokens are generated when the containers are
|
||||
// acquired by the AM, not when the containers are allocated
|
||||
@Test
|
||||
public void testContainerTokenGeneratedOnPullRequest() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
// request a container.
|
||||
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
|
||||
ContainerId containerId2 =
|
||||
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
|
||||
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||
|
||||
RMContainer container =
|
||||
rm1.getResourceScheduler().getRMContainer(containerId2);
|
||||
// no container token is generated.
|
||||
Assert.assertEquals(containerId2, container.getContainerId());
|
||||
Assert.assertNull(container.getContainer().getContainerToken());
|
||||
|
||||
// acquire the container.
|
||||
List<Container> containers =
|
||||
am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
Assert.assertEquals(containerId2, containers.get(0).getId());
|
||||
// container token is generated.
|
||||
Assert.assertNotNull(containers.get(0).getContainerToken());
|
||||
rm1.stop();
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user