YARN-713. Fixed ResourceManager to not crash while building tokens when DNS issues happen transmittently. Contributed by Jian He.

svn merge --ignore-ancestry -c 1569979 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1569980 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-19 23:39:48 +00:00
parent df72553cba
commit 5dcbc034f2
15 changed files with 264 additions and 105 deletions

View File

@ -300,6 +300,9 @@ Release 2.4.0 - UNRELEASED
application history store in the transition to the final state. (Contributed
by Zhijie Shen)
YARN-713. Fixed ResourceManager to not crash while building tokens when DNS
issues happen transmittently. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -464,9 +464,11 @@ public AllocateResponse allocate(AllocateRequest request)
blacklistAdditions, blacklistRemovals);
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
@ -505,12 +507,6 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
.createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
@ -202,7 +203,8 @@ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED,
RMAppAttemptState.ALLOCATED_SAVING,
EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition())
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
@ -769,8 +771,9 @@ public void transition(RMAppAttemptImpl appAttempt,
private static final List<ContainerId> EMPTY_CONTAINER_RELEASE_LIST =
new ArrayList<ContainerId>();
private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
new ArrayList<ResourceRequest>();
new ArrayList<ResourceRequest>();
private static final class ScheduleTransition
implements
@ -803,29 +806,57 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
}
}
private static final class AMContainerAllocatedTransition
extends BaseTransition {
private static final class AMContainerAllocatedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
// Acquire the AM container from the scheduler.
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST, null, null);
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
null);
// There must be at least one container allocated, because a
// CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
// and is put in SchedulerApplication#newlyAllocatedContainers. Then,
// YarnScheduler#allocate will fetch it.
assert amContainerAllocation.getContainers().size() != 0;
// and is put in SchedulerApplication#newlyAllocatedContainers.
// Note that YarnScheduler#allocate is not guaranteed to be able to
// fetch it since container may not be fetchable for some reason like
// DNS unavailable causing container token not generated. As such, we
// return to the previous state and keep retry until am container is
// fetched.
if (amContainerAllocation.getContainers().size() == 0) {
appAttempt.retryFetchingAMContainer(appAttempt);
return RMAppAttemptState.SCHEDULED;
}
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
0));
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0));
appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource());
appAttempt.getMasterContainer().getResource());
appAttempt.storeAttempt();
return RMAppAttemptState.ALLOCATED_SAVING;
}
}
private void retryFetchingAMContainer(final RMAppAttemptImpl appAttempt) {
// start a new thread so that we are not blocking main dispatcher thread.
new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to resend the"
+ " ContainerAllocated Event.");
}
appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
appAttempt.applicationAttemptId));
}
}.start();
}
private static final class AttemptStoredTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,

View File

@ -25,16 +25,7 @@
public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent {
private final Container container;
public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId,
Container container) {
public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED);
this.container = container;
}
public Container getContainer() {
return this.container;
}
}

View File

@ -340,7 +340,7 @@ private static final class ContainerStartedTransition extends
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
container.appAttemptId, container.container));
container.appAttemptId));
}
}

View File

@ -31,11 +31,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
public abstract class AbstractYarnScheduler implements ResourceScheduler {
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication> applications;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {

View File

@ -23,10 +23,9 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class Allocation {
@ -38,24 +37,24 @@ public class Allocation {
final Set<ContainerId> strictContainers;
final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources;
public Allocation(List<Container> containers, Resource resourceLimit) {
this(containers, resourceLimit, null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers) {
this(containers, resourceLimit, strictContainers, null, null);
}
final List<NMToken> nmTokens;
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources) {
this(containers, resourceLimit,strictContainers, fungibleContainers,
fungibleResources, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
this.containers = containers;
this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers;
this.fungibleContainers = fungibleContainers;
this.fungibleResources = fungibleResources;
this.nmTokens = nmTokens;
}
public List<Container> getContainers() {
@ -78,4 +77,8 @@ public List<ResourceRequest> getResourcePreemptions() {
return fungibleResources;
}
public List<NMToken> getNMTokens() {
return nmTokens;
}
}

View File

@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -339,21 +341,61 @@ public Resource getCurrentConsumption() {
return currentConsumption;
}
public synchronized List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
for (RMContainer rmContainer : newlyAllocatedContainers) {
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
Container container = rmContainer.getContainer();
rmContainer.getContainer().setContainerToken(
rmContext.getContainerTokenSecretManager().createContainerToken(
rmContainer.getContainerId(), container.getNodeId(), getUser(),
container.getResource()));
returnContainerList.add(rmContainer.getContainer());
public static class ContainersAndNMTokensAllocation {
List<Container> containerList;
List<NMToken> nmTokenList;
public ContainersAndNMTokensAllocation(List<Container> containerList,
List<NMToken> nmTokenList) {
this.containerList = containerList;
this.nmTokenList = nmTokenList;
}
newlyAllocatedContainers.clear();
return returnContainerList;
public List<Container> getContainerList() {
return containerList;
}
public List<NMToken> getNMTokenList() {
return nmTokenList;
}
}
// Create container token and NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
public synchronized ContainersAndNMTokensAllocation
pullNewlyAllocatedContainersAndNMTokens() {
List<Container> returnContainerList =
new ArrayList<Container>(newlyAllocatedContainers.size());
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
.hasNext();) {
RMContainer rmContainer = i.next();
Container container = rmContainer.getContainer();
try {
// create container token and NMToken altogether.
container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource()));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
if (nmToken != null) {
nmTokens.add(nmToken);
}
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.
LOG.error(
"Error trying to assign container token to allocated container "
+ container.getId(), e);
continue;
}
returnContainerList.add(container);
i.remove();
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
}
return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
}
public synchronized void updateBlacklist(

View File

@ -104,9 +104,6 @@ public class CapacityScheduler extends AbstractYarnScheduler
private CSQueue root;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
@ -557,9 +554,6 @@ private synchronized void doneApplicationAttempt(
}
}
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0));
@Override
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,

View File

@ -237,9 +237,11 @@ public synchronized Allocation getAllocation(ResourceCalculator rc,
ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont);
return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(),
null, currentContPreemption,
Collections.singletonList(rr));
ContainersAndNMTokensAllocation allocation =
pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(), getHeadroom(), null,
currentContPreemption, Collections.singletonList(rr),
allocation.getNMTokenList());
}
}

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -143,12 +144,6 @@ public class FairScheduler extends AbstractYarnScheduler {
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
// Aggregate metrics
FSQueueMetrics rootMetrics;
@ -922,9 +917,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom(), preemptionContainerIds);
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(),
application.getHeadroom(), preemptionContainerIds, null, null,
allocation.getNMTokenList());
}
}

View File

@ -50,7 +50,6 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -80,6 +79,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -114,9 +114,6 @@ public class FifoScheduler extends AbstractYarnScheduler implements
Configuration conf;
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized;
@ -264,8 +261,7 @@ public Resource getMaximumResourceCapability() {
}
}
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
@ -328,10 +324,11 @@ public Allocation allocate(
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
return new Allocation(
application.pullNewlyAllocatedContainers(),
application.getHeadroom());
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(),
application.getHeadroom(), null, null, null,
allocation.getNMTokenList());
}
}

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
@ -177,35 +175,39 @@ public void run() {
activateNextMasterKey();
}
}
public List<NMToken> createAndGetNMTokens(String applicationSubmitter,
ApplicationAttemptId appAttemptId, List<Container> containers) {
public NMToken createAndGetNMToken(String applicationSubmitter,
ApplicationAttemptId appAttemptId, Container container) {
try {
this.readLock.lock();
List<NMToken> nmTokens = new ArrayList<NMToken>();
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
NMToken nmToken = null;
if (nodeSet != null) {
for (Container container : containers) {
if (!nodeSet.contains(container.getNodeId())) {
if (!nodeSet.contains(container.getNodeId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending NMToken for nodeId : "
+ container.getNodeId().toString()
+ " for application attempt : " + appAttemptId.toString());
Token token = createNMToken(appAttemptId, container.getNodeId(),
applicationSubmitter);
NMToken nmToken =
NMToken.newInstance(container.getNodeId(), token);
nmTokens.add(nmToken);
// This will update the nmToken set.
}
Token token =
createNMToken(container.getId().getApplicationAttemptId(),
container.getNodeId(), applicationSubmitter);
nmToken = NMToken.newInstance(container.getNodeId(), token);
// The node set here is used for differentiating whether the NMToken
// has been issued for this node from the client's perspective. If
// this is an AM container, the NMToken is issued only for RM and so
// we should not update the node set.
if (container.getId().getId() != 1) {
nodeSet.add(container.getNodeId());
}
}
}
return nmTokens;
return nmToken;
} finally {
this.readLock.unlock();
}
}
public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
try {
this.writeLock.lock();

View File

@ -598,8 +598,7 @@ private Container allocateApplicationAttempt() {
applicationAttempt.handle(
new RMAppAttemptContainerAllocatedEvent(
applicationAttempt.getAppAttemptId(),
container));
applicationAttempt.getAppAttemptId()));
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
applicationAttempt.getAppAttemptState());

View File

@ -25,20 +25,29 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Test;
@ -149,4 +158,92 @@ public void testContainerTokenGeneratedOnPullRequest() throws Exception {
Assert.assertNotNull(containers.get(0).getContainerToken());
rm1.stop();
}
@Test
public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{
YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
// acquire the container.
SecurityUtilTestHelper.setTokenServiceUseIp(true);
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// not able to fetch the container;
Assert.assertEquals(0, containers.size());
SecurityUtilTestHelper.setTokenServiceUseIp(false);
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// should be able to fetch the container;
Assert.assertEquals(1, containers.size());
}
private volatile int numRetries = 0;
private class TestRMSecretManagerService extends RMSecretManagerService {
public TestRMSecretManagerService(Configuration conf,
RMContextImpl rmContext) {
super(conf, rmContext);
}
@Override
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
return new RMContainerTokenSecretManager(conf) {
@Override
public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability) {
numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter,
capability);
}
};
}
}
// This is to test fetching AM container will be retried, if AM container is
// not fetchable since DNS is unavailable causing container token/NMtoken
// creation failure.
@Test(timeout = 20000)
public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
final YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf) {
@Override
protected RMSecretManagerService createRMSecretManagerService() {
return new TestRMSecretManagerService(conf, rmContext);
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
SecurityUtilTestHelper.setTokenServiceUseIp(true);
RMApp app1 = rm1.submitApp(200);
RMAppAttempt attempt = app1.getCurrentAppAttempt();
nm1.nodeHeartbeat(true);
// fetching am container will fail, keep retrying 5 times.
while (numRetries <= 5) {
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
Assert.assertEquals(RMAppAttemptState.SCHEDULED,
attempt.getAppAttemptState());
System.out.println("Waiting for am container to be allocated.");
}
SecurityUtilTestHelper.setTokenServiceUseIp(false);
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
}