YARN-713. Fixed ResourceManager to not crash while building tokens when DNS issues happen transmittently. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1569979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-19 23:39:13 +00:00
parent 0369aff403
commit 5fd5c9900c
15 changed files with 264 additions and 105 deletions

View File

@ -315,6 +315,9 @@ Release 2.4.0 - UNRELEASED
application history store in the transition to the final state. (Contributed application history store in the transition to the final state. (Contributed
by Zhijie Shen) by Zhijie Shen)
YARN-713. Fixed ResourceManager to not crash while building tokens when DNS
issues happen transmittently. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -464,9 +464,11 @@ public AllocateResponse allocate(AllocateRequest request)
blacklistAdditions, blacklistRemovals); blacklistAdditions, blacklistRemovals);
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse = AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// update the response with the deltas of node status changes // update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>(); List<RMNode> updatedNodes = new ArrayList<RMNode>();
@ -505,12 +507,6 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation)); .setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
.createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
/* /*
* As we are updating the response inside the lock object so we don't * As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which * need to worry about unregister call occurring in between (which

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
@ -202,7 +203,8 @@ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SCHEDULED State // Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED, .addTransition(RMAppAttemptState.SCHEDULED,
RMAppAttemptState.ALLOCATED_SAVING, EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.CONTAINER_ALLOCATED, RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition()) new AMContainerAllocatedTransition())
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING, .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
@ -769,6 +771,7 @@ public void transition(RMAppAttemptImpl appAttempt,
private static final List<ContainerId> EMPTY_CONTAINER_RELEASE_LIST = private static final List<ContainerId> EMPTY_CONTAINER_RELEASE_LIST =
new ArrayList<ContainerId>(); new ArrayList<ContainerId>();
private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST = private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
new ArrayList<ResourceRequest>(); new ArrayList<ResourceRequest>();
@ -804,28 +807,56 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
} }
private static final class AMContainerAllocatedTransition private static final class AMContainerAllocatedTransition
extends BaseTransition { implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override @Override
public void transition(RMAppAttemptImpl appAttempt, public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) { RMAppAttemptEvent event) {
// Acquire the AM container from the scheduler. // Acquire the AM container from the scheduler.
Allocation amContainerAllocation = appAttempt.scheduler.allocate( Allocation amContainerAllocation =
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_RELEASE_LIST, null, null); EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
null);
// There must be at least one container allocated, because a // There must be at least one container allocated, because a
// CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
// and is put in SchedulerApplication#newlyAllocatedContainers. Then, // and is put in SchedulerApplication#newlyAllocatedContainers.
// YarnScheduler#allocate will fetch it.
assert amContainerAllocation.getContainers().size() != 0; // Note that YarnScheduler#allocate is not guaranteed to be able to
// fetch it since container may not be fetchable for some reason like
// DNS unavailable causing container token not generated. As such, we
// return to the previous state and keep retry until am container is
// fetched.
if (amContainerAllocation.getContainers().size() == 0) {
appAttempt.retryFetchingAMContainer(appAttempt);
return RMAppAttemptState.SCHEDULED;
}
// Set the masterContainer // Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( appAttempt.setMasterContainer(amContainerAllocation.getContainers()
0)); .get(0));
appAttempt.getSubmissionContext().setResource( appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource()); appAttempt.getMasterContainer().getResource());
appAttempt.storeAttempt(); appAttempt.storeAttempt();
return RMAppAttemptState.ALLOCATED_SAVING;
} }
} }
private void retryFetchingAMContainer(final RMAppAttemptImpl appAttempt) {
// start a new thread so that we are not blocking main dispatcher thread.
new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to resend the"
+ " ContainerAllocated Event.");
}
appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
appAttempt.applicationAttemptId));
}
}.start();
}
private static final class AttemptStoredTransition extends BaseTransition { private static final class AttemptStoredTransition extends BaseTransition {
@Override @Override
public void transition(RMAppAttemptImpl appAttempt, public void transition(RMAppAttemptImpl appAttempt,

View File

@ -25,16 +25,7 @@
public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent { public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent {
private final Container container; public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId) {
public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId,
Container container) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED); super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED);
this.container = container;
} }
public Container getContainer() {
return this.container;
}
} }

View File

@ -340,7 +340,7 @@ private static final class ContainerStartedTransition extends
@Override @Override
public void transition(RMContainerImpl container, RMContainerEvent event) { public void transition(RMContainerImpl container, RMContainerEvent event) {
container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
container.appAttemptId, container.container)); container.appAttemptId));
} }
} }

View File

@ -31,11 +31,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
public abstract class AbstractYarnScheduler implements ResourceScheduler { public abstract class AbstractYarnScheduler implements ResourceScheduler {
protected RMContext rmContext; protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication> applications; protected Map<ApplicationId, SchedulerApplication> applications;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
public synchronized List<Container> getTransferredContainers( public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) { ApplicationAttemptId currentAttempt) {

View File

@ -22,10 +22,9 @@
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class Allocation { public class Allocation {
@ -34,24 +33,24 @@ public class Allocation {
final Set<ContainerId> strictContainers; final Set<ContainerId> strictContainers;
final Set<ContainerId> fungibleContainers; final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources; final List<ResourceRequest> fungibleResources;
final List<NMToken> nmTokens;
public Allocation(List<Container> containers, Resource resourceLimit) {
this(containers, resourceLimit, null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers) {
this(containers, resourceLimit, strictContainers, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit, public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources) { List<ResourceRequest> fungibleResources) {
this(containers, resourceLimit,strictContainers, fungibleContainers,
fungibleResources, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
this.containers = containers; this.containers = containers;
this.resourceLimit = resourceLimit; this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers; this.strictContainers = strictContainers;
this.fungibleContainers = fungibleContainers; this.fungibleContainers = fungibleContainers;
this.fungibleResources = fungibleResources; this.fungibleResources = fungibleResources;
this.nmTokens = nmTokens;
} }
public List<Container> getContainers() { public List<Container> getContainers() {
@ -74,4 +73,8 @@ public List<ResourceRequest> getResourcePreemptions() {
return fungibleResources; return fungibleResources;
} }
public List<NMToken> getNMTokens() {
return nmTokens;
}
} }

View File

@ -20,6 +20,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -339,21 +341,61 @@ public Resource getCurrentConsumption() {
return currentConsumption; return currentConsumption;
} }
public synchronized List<Container> pullNewlyAllocatedContainers() { public static class ContainersAndNMTokensAllocation {
List<Container> returnContainerList = new ArrayList<Container>( List<Container> containerList;
newlyAllocatedContainers.size()); List<NMToken> nmTokenList;
for (RMContainer rmContainer : newlyAllocatedContainers) {
public ContainersAndNMTokensAllocation(List<Container> containerList,
List<NMToken> nmTokenList) {
this.containerList = containerList;
this.nmTokenList = nmTokenList;
}
public List<Container> getContainerList() {
return containerList;
}
public List<NMToken> getNMTokenList() {
return nmTokenList;
}
}
// Create container token and NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
public synchronized ContainersAndNMTokensAllocation
pullNewlyAllocatedContainersAndNMTokens() {
List<Container> returnContainerList =
new ArrayList<Container>(newlyAllocatedContainers.size());
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
.hasNext();) {
RMContainer rmContainer = i.next();
Container container = rmContainer.getContainer();
try {
// create container token and NMToken altogether.
container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource()));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
if (nmToken != null) {
nmTokens.add(nmToken);
}
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.
LOG.error(
"Error trying to assign container token to allocated container "
+ container.getId(), e);
continue;
}
returnContainerList.add(container);
i.remove();
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED)); RMContainerEventType.ACQUIRED));
Container container = rmContainer.getContainer();
rmContainer.getContainer().setContainerToken(
rmContext.getContainerTokenSecretManager().createContainerToken(
rmContainer.getContainerId(), container.getNodeId(), getUser(),
container.getResource()));
returnContainerList.add(rmContainer.getContainer());
} }
newlyAllocatedContainers.clear(); return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
return returnContainerList;
} }
public synchronized void updateBlacklist( public synchronized void updateBlacklist(

View File

@ -104,9 +104,6 @@ public class CapacityScheduler extends AbstractYarnScheduler
private CSQueue root; private CSQueue root;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() { static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override @Override
public int compare(CSQueue q1, CSQueue q2) { public int compare(CSQueue q1, CSQueue q2) {
@ -557,9 +554,6 @@ private synchronized void doneApplicationAttempt(
} }
} }
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0));
@Override @Override
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId, public Allocation allocate(ApplicationAttemptId applicationAttemptId,

View File

@ -237,9 +237,11 @@ public synchronized Allocation getAllocation(ResourceCalculator rc,
ResourceRequest rr = ResourceRequest.newInstance( ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY, Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont); minimumAllocation, numCont);
return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), ContainersAndNMTokensAllocation allocation =
null, currentContPreemption, pullNewlyAllocatedContainersAndNMTokens();
Collections.singletonList(rr)); return new Allocation(allocation.getContainerList(), getHeadroom(), null,
currentContPreemption, Collections.singletonList(rr),
allocation.getNMTokenList());
} }
} }

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -143,12 +144,6 @@ public class FairScheduler extends AbstractYarnScheduler {
// How often fair shares are re-calculated (ms) // How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500; protected long UPDATE_INTERVAL = 500;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
// Aggregate metrics // Aggregate metrics
FSQueueMetrics rootMetrics; FSQueueMetrics rootMetrics;
@ -922,9 +917,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
} }
application.updateBlacklist(blacklistAdditions, blacklistRemovals); application.updateBlacklist(blacklistAdditions, blacklistRemovals);
ContainersAndNMTokensAllocation allocation =
return new Allocation(application.pullNewlyAllocatedContainers(), application.pullNewlyAllocatedContainersAndNMTokens();
application.getHeadroom(), preemptionContainerIds); return new Allocation(allocation.getContainerList(),
application.getHeadroom(), preemptionContainerIds, null, null,
allocation.getNMTokenList());
} }
} }

View File

@ -50,7 +50,6 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -80,6 +79,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -114,9 +114,6 @@ public class FifoScheduler extends AbstractYarnScheduler implements
Configuration conf; Configuration conf;
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>(); protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized; private boolean initialized;
@ -264,8 +261,7 @@ public Resource getMaximumResourceCapability() {
} }
} }
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
@Override @Override
public Allocation allocate( public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
@ -328,10 +324,11 @@ public Allocation allocate(
} }
application.updateBlacklist(blacklistAdditions, blacklistRemovals); application.updateBlacklist(blacklistAdditions, blacklistRemovals);
ContainersAndNMTokensAllocation allocation =
return new Allocation( application.pullNewlyAllocatedContainersAndNMTokens();
application.pullNewlyAllocatedContainers(), return new Allocation(allocation.getContainerList(),
application.getHeadroom()); application.getHeadroom(), null, null, null,
allocation.getNMTokenList());
} }
} }

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.security; package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -178,29 +176,33 @@ public void run() {
} }
} }
public List<NMToken> createAndGetNMTokens(String applicationSubmitter, public NMToken createAndGetNMToken(String applicationSubmitter,
ApplicationAttemptId appAttemptId, List<Container> containers) { ApplicationAttemptId appAttemptId, Container container) {
try { try {
this.readLock.lock(); this.readLock.lock();
List<NMToken> nmTokens = new ArrayList<NMToken>();
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
NMToken nmToken = null;
if (nodeSet != null) { if (nodeSet != null) {
for (Container container : containers) {
if (!nodeSet.contains(container.getNodeId())) { if (!nodeSet.contains(container.getNodeId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending NMToken for nodeId : " LOG.debug("Sending NMToken for nodeId : "
+ container.getNodeId().toString() + container.getNodeId().toString()
+ " for application attempt : " + appAttemptId.toString()); + " for application attempt : " + appAttemptId.toString());
Token token = createNMToken(appAttemptId, container.getNodeId(), }
applicationSubmitter); Token token =
NMToken nmToken = createNMToken(container.getId().getApplicationAttemptId(),
NMToken.newInstance(container.getNodeId(), token); container.getNodeId(), applicationSubmitter);
nmTokens.add(nmToken); nmToken = NMToken.newInstance(container.getNodeId(), token);
// This will update the nmToken set. // The node set here is used for differentiating whether the NMToken
// has been issued for this node from the client's perspective. If
// this is an AM container, the NMToken is issued only for RM and so
// we should not update the node set.
if (container.getId().getId() != 1) {
nodeSet.add(container.getNodeId()); nodeSet.add(container.getNodeId());
} }
} }
} }
return nmTokens; return nmToken;
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }

View File

@ -598,8 +598,7 @@ private Container allocateApplicationAttempt() {
applicationAttempt.handle( applicationAttempt.handle(
new RMAppAttemptContainerAllocatedEvent( new RMAppAttemptContainerAllocatedEvent(
applicationAttempt.getAppAttemptId(), applicationAttempt.getAppAttemptId()));
container));
assertEquals(RMAppAttemptState.ALLOCATED_SAVING, assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());

View File

@ -25,20 +25,29 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Test; import org.junit.Test;
@ -149,4 +158,92 @@ public void testContainerTokenGeneratedOnPullRequest() throws Exception {
Assert.assertNotNull(containers.get(0).getContainerToken()); Assert.assertNotNull(containers.get(0).getContainerToken());
rm1.stop(); rm1.stop();
} }
@Test
public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{
YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
// acquire the container.
SecurityUtilTestHelper.setTokenServiceUseIp(true);
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// not able to fetch the container;
Assert.assertEquals(0, containers.size());
SecurityUtilTestHelper.setTokenServiceUseIp(false);
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// should be able to fetch the container;
Assert.assertEquals(1, containers.size());
}
private volatile int numRetries = 0;
private class TestRMSecretManagerService extends RMSecretManagerService {
public TestRMSecretManagerService(Configuration conf,
RMContextImpl rmContext) {
super(conf, rmContext);
}
@Override
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
return new RMContainerTokenSecretManager(conf) {
@Override
public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability) {
numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter,
capability);
}
};
}
}
// This is to test fetching AM container will be retried, if AM container is
// not fetchable since DNS is unavailable causing container token/NMtoken
// creation failure.
@Test(timeout = 20000)
public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
final YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf) {
@Override
protected RMSecretManagerService createRMSecretManagerService() {
return new TestRMSecretManagerService(conf, rmContext);
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
SecurityUtilTestHelper.setTokenServiceUseIp(true);
RMApp app1 = rm1.submitApp(200);
RMAppAttempt attempt = app1.getCurrentAppAttempt();
nm1.nodeHeartbeat(true);
// fetching am container will fail, keep retrying 5 times.
while (numRetries <= 5) {
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
Assert.assertEquals(RMAppAttemptState.SCHEDULED,
attempt.getAppAttemptState());
System.out.println("Waiting for am container to be allocated.");
}
SecurityUtilTestHelper.setTokenServiceUseIp(false);
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
} }