From 5fd5c9900cfd299428acbc8dff767273e44647c0 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 19 Feb 2014 23:39:13 +0000 Subject: [PATCH] YARN-713. Fixed ResourceManager to not crash while building tokens when DNS issues happen transmittently. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1569979 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../ApplicationMasterService.java | 10 +- .../rmapp/attempt/RMAppAttemptImpl.java | 63 +++++++++--- .../RMAppAttemptContainerAllocatedEvent.java | 11 +-- .../rmcontainer/RMContainerImpl.java | 2 +- .../scheduler/AbstractYarnScheduler.java | 5 + .../resourcemanager/scheduler/Allocation.java | 25 ++--- .../SchedulerApplicationAttempt.java | 70 ++++++++++--- .../scheduler/capacity/CapacityScheduler.java | 6 -- .../common/fica/FiCaSchedulerApp.java | 8 +- .../scheduler/fair/FairScheduler.java | 15 ++- .../scheduler/fifo/FifoScheduler.java | 17 ++-- .../security/NMTokenSecretManagerInRM.java | 34 ++++--- .../attempt/TestRMAppAttemptTransitions.java | 3 +- .../capacity/TestContainerAllocation.java | 97 +++++++++++++++++++ 15 files changed, 264 insertions(+), 105 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5275d38978c..e942e6689a1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -315,6 +315,9 @@ Release 2.4.0 - UNRELEASED application history store in the transition to the final state. (Contributed by Zhijie Shen) + YARN-713. Fixed ResourceManager to not crash while building tokens when DNS + issues happen transmittently. (Jian He via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 563dbe91567..db81dd86afa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -464,9 +464,11 @@ public AllocateResponse allocate(AllocateRequest request) blacklistAdditions, blacklistRemovals); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); + if (!allocation.getContainers().isEmpty()) { + allocateResponse.setNMTokens(allocation.getNMTokens()); + } // update the response with the deltas of node status changes List updatedNodes = new ArrayList(); @@ -505,12 +507,6 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); - // Adding NMTokens for allocated containers. - if (!allocation.getContainers().isEmpty()) { - allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() - .createAndGetNMTokens(app.getUser(), appAttemptId, - allocation.getContainers())); - } /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8d69d08096a..3b845b1b8b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; @@ -202,7 +203,8 @@ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) // Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, - RMAppAttemptState.ALLOCATED_SAVING, + EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING, @@ -769,8 +771,9 @@ public void transition(RMAppAttemptImpl appAttempt, private static final List EMPTY_CONTAINER_RELEASE_LIST = new ArrayList(); + private static final List EMPTY_CONTAINER_REQUEST_LIST = - new ArrayList(); + new ArrayList(); private static final class ScheduleTransition implements @@ -803,29 +806,57 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private static final class AMContainerAllocatedTransition - extends BaseTransition { + private static final class AMContainerAllocatedTransition + implements + MultipleArcTransition { @Override - public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { // Acquire the AM container from the scheduler. - Allocation amContainerAllocation = appAttempt.scheduler.allocate( - appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, - EMPTY_CONTAINER_RELEASE_LIST, null, null); + Allocation amContainerAllocation = + appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, + EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, + null); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, - // and is put in SchedulerApplication#newlyAllocatedContainers. Then, - // YarnScheduler#allocate will fetch it. - assert amContainerAllocation.getContainers().size() != 0; + // and is put in SchedulerApplication#newlyAllocatedContainers. + + // Note that YarnScheduler#allocate is not guaranteed to be able to + // fetch it since container may not be fetchable for some reason like + // DNS unavailable causing container token not generated. As such, we + // return to the previous state and keep retry until am container is + // fetched. + if (amContainerAllocation.getContainers().size() == 0) { + appAttempt.retryFetchingAMContainer(appAttempt); + return RMAppAttemptState.SCHEDULED; + } // Set the masterContainer - appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( - 0)); + appAttempt.setMasterContainer(amContainerAllocation.getContainers() + .get(0)); appAttempt.getSubmissionContext().setResource( - appAttempt.getMasterContainer().getResource()); + appAttempt.getMasterContainer().getResource()); appAttempt.storeAttempt(); + return RMAppAttemptState.ALLOCATED_SAVING; } } - + + private void retryFetchingAMContainer(final RMAppAttemptImpl appAttempt) { + // start a new thread so that we are not blocking main dispatcher thread. + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting to resend the" + + " ContainerAllocated Event."); + } + appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( + appAttempt.applicationAttemptId)); + } + }.start(); + } + private static final class AttemptStoredTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java index e841f7af19b..681f38c2c2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java @@ -25,16 +25,7 @@ public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent { - private final Container container; - - public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId, - Container container) { + public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED); - this.container = container; } - - public Container getContainer() { - return this.container; - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 057c9ace7e3..57fb703957a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -340,7 +340,7 @@ private static final class ContainerStartedTransition extends @Override public void transition(RMContainerImpl container, RMContainerEvent event) { container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( - container.appAttemptId, container.container)); + container.appAttemptId)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4208d1db5e3..0f3af41b01c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -31,11 +31,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; public abstract class AbstractYarnScheduler implements ResourceScheduler { protected RMContext rmContext; protected Map applications; + protected final static List EMPTY_CONTAINER_LIST = + new ArrayList(); + protected static final Allocation EMPTY_ALLOCATION = new Allocation( + EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index c03e31d8a31..3f2d8afd2e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -22,10 +22,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class Allocation { @@ -34,24 +33,24 @@ public class Allocation { final Set strictContainers; final Set fungibleContainers; final List fungibleResources; - - public Allocation(List containers, Resource resourceLimit) { - this(containers, resourceLimit, null, null, null); - } - - public Allocation(List containers, Resource resourceLimit, - Set strictContainers) { - this(containers, resourceLimit, strictContainers, null, null); - } + final List nmTokens; public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources) { + this(containers, resourceLimit,strictContainers, fungibleContainers, + fungibleResources, null); + } + + public Allocation(List containers, Resource resourceLimit, + Set strictContainers, Set fungibleContainers, + List fungibleResources, List nmTokens) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; this.fungibleContainers = fungibleContainers; this.fungibleResources = fungibleResources; + this.nmTokens = nmTokens; } public List getContainers() { @@ -74,4 +73,8 @@ public List getResourcePreemptions() { return fungibleResources; } + public List getNMTokens() { + return nmTokens; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b1801dc10d1..f35f76ff63d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -339,21 +341,61 @@ public Resource getCurrentConsumption() { return currentConsumption; } - public synchronized List pullNewlyAllocatedContainers() { - List returnContainerList = new ArrayList( - newlyAllocatedContainers.size()); - for (RMContainer rmContainer : newlyAllocatedContainers) { - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); - Container container = rmContainer.getContainer(); - rmContainer.getContainer().setContainerToken( - rmContext.getContainerTokenSecretManager().createContainerToken( - rmContainer.getContainerId(), container.getNodeId(), getUser(), - container.getResource())); - returnContainerList.add(rmContainer.getContainer()); + public static class ContainersAndNMTokensAllocation { + List containerList; + List nmTokenList; + + public ContainersAndNMTokensAllocation(List containerList, + List nmTokenList) { + this.containerList = containerList; + this.nmTokenList = nmTokenList; } - newlyAllocatedContainers.clear(); - return returnContainerList; + + public List getContainerList() { + return containerList; + } + + public List getNMTokenList() { + return nmTokenList; + } + } + + // Create container token and NMToken altogether, if either of them fails for + // some reason like DNS unavailable, do not return this container and keep it + // in the newlyAllocatedContainers waiting to be refetched. + public synchronized ContainersAndNMTokensAllocation + pullNewlyAllocatedContainersAndNMTokens() { + List returnContainerList = + new ArrayList(newlyAllocatedContainers.size()); + List nmTokens = new ArrayList(); + for (Iterator i = newlyAllocatedContainers.iterator(); i + .hasNext();) { + RMContainer rmContainer = i.next(); + Container container = rmContainer.getContainer(); + try { + // create container token and NMToken altogether. + container.setContainerToken(rmContext.getContainerTokenSecretManager() + .createContainerToken(container.getId(), container.getNodeId(), + getUser(), container.getResource())); + NMToken nmToken = + rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), + getApplicationAttemptId(), container); + if (nmToken != null) { + nmTokens.add(nmToken); + } + } catch (IllegalArgumentException e) { + // DNS might be down, skip returning this container. + LOG.error( + "Error trying to assign container token to allocated container " + + container.getId(), e); + continue; + } + returnContainerList.add(container); + i.remove(); + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + } + return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); } public synchronized void updateBlacklist( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6826c4941f7..b8f2376358c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -104,9 +104,6 @@ public class CapacityScheduler extends AbstractYarnScheduler private CSQueue root; - private final static List EMPTY_CONTAINER_LIST = - new ArrayList(); - static final Comparator queueComparator = new Comparator() { @Override public int compare(CSQueue q1, CSQueue q2) { @@ -557,9 +554,6 @@ private synchronized void doneApplicationAttempt( } } - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0)); - @Override @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4be6b941d12..470cb106f18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -237,9 +237,11 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), - null, currentContPreemption, - Collections.singletonList(rr)); + ContainersAndNMTokensAllocation allocation = + pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), getHeadroom(), null, + currentContPreemption, Collections.singletonList(rr), + allocation.getNMTokenList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e23de7b3e90..a852d7b9242 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -143,12 +144,6 @@ public class FairScheduler extends AbstractYarnScheduler { // How often fair shares are re-calculated (ms) protected long UPDATE_INTERVAL = 500; - private final static List EMPTY_CONTAINER_LIST = - new ArrayList(); - - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); - // Aggregate metrics FSQueueMetrics rootMetrics; @@ -922,9 +917,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, } application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom(), preemptionContainerIds); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), + application.getHeadroom(), preemptionContainerIds, null, null, + allocation.getNMTokenList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index a2e01345abe..61628f95dc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -80,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -114,9 +114,6 @@ public class FifoScheduler extends AbstractYarnScheduler implements Configuration conf; - private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {}; - private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); - protected Map nodes = new ConcurrentHashMap(); private boolean initialized; @@ -264,8 +261,7 @@ public Resource getMaximumResourceCapability() { } } - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, @@ -328,10 +324,11 @@ public Allocation allocate( } application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - return new Allocation( - application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), + application.getHeadroom(), null, null, null, + allocation.getNMTokenList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index ab31eaf3af1..9ec7b690b57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -18,10 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; -import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; @@ -177,35 +175,39 @@ public void run() { activateNextMasterKey(); } } - - public List createAndGetNMTokens(String applicationSubmitter, - ApplicationAttemptId appAttemptId, List containers) { + + public NMToken createAndGetNMToken(String applicationSubmitter, + ApplicationAttemptId appAttemptId, Container container) { try { this.readLock.lock(); - List nmTokens = new ArrayList(); HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); + NMToken nmToken = null; if (nodeSet != null) { - for (Container container : containers) { - if (!nodeSet.contains(container.getNodeId())) { + if (!nodeSet.contains(container.getNodeId())) { + if (LOG.isDebugEnabled()) { LOG.debug("Sending NMToken for nodeId : " + container.getNodeId().toString() + " for application attempt : " + appAttemptId.toString()); - Token token = createNMToken(appAttemptId, container.getNodeId(), - applicationSubmitter); - NMToken nmToken = - NMToken.newInstance(container.getNodeId(), token); - nmTokens.add(nmToken); - // This will update the nmToken set. + } + Token token = + createNMToken(container.getId().getApplicationAttemptId(), + container.getNodeId(), applicationSubmitter); + nmToken = NMToken.newInstance(container.getNodeId(), token); + // The node set here is used for differentiating whether the NMToken + // has been issued for this node from the client's perspective. If + // this is an AM container, the NMToken is issued only for RM and so + // we should not update the node set. + if (container.getId().getId() != 1) { nodeSet.add(container.getNodeId()); } } } - return nmTokens; + return nmToken; } finally { this.readLock.unlock(); } } - + public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { try { this.writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index a34a42b5adf..dd57cf4ace0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -598,8 +598,7 @@ private Container allocateApplicationAttempt() { applicationAttempt.handle( new RMAppAttemptContainerAllocatedEvent( - applicationAttempt.getAppAttemptId(), - container)); + applicationAttempt.getAppAttemptId())); assertEquals(RMAppAttemptState.ALLOCATED_SAVING, applicationAttempt.getAppAttemptState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 0e3bdeb2d4a..86e1b1e569f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -25,20 +25,29 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.junit.Test; @@ -149,4 +158,92 @@ public void testContainerTokenGeneratedOnPullRequest() throws Exception { Assert.assertNotNull(containers.get(0).getContainerToken()); rm1.stop(); } + + @Test + public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // acquire the container. + SecurityUtilTestHelper.setTokenServiceUseIp(true); + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + // not able to fetch the container; + Assert.assertEquals(0, containers.size()); + + SecurityUtilTestHelper.setTokenServiceUseIp(false); + containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + // should be able to fetch the container; + Assert.assertEquals(1, containers.size()); + } + + private volatile int numRetries = 0; + private class TestRMSecretManagerService extends RMSecretManagerService { + + public TestRMSecretManagerService(Configuration conf, + RMContextImpl rmContext) { + super(conf, rmContext); + } + @Override + protected RMContainerTokenSecretManager createContainerTokenSecretManager( + Configuration conf) { + return new RMContainerTokenSecretManager(conf) { + + @Override + public Token createContainerToken(ContainerId containerId, + NodeId nodeId, String appSubmitter, Resource capability) { + numRetries++; + return super.createContainerToken(containerId, nodeId, appSubmitter, + capability); + } + }; + } + } + + // This is to test fetching AM container will be retried, if AM container is + // not fetchable since DNS is unavailable causing container token/NMtoken + // creation failure. + @Test(timeout = 20000) + public void testAMContainerAllocationWhenDNSUnavailable() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + MockRM rm1 = new MockRM(conf) { + @Override + protected RMSecretManagerService createRMSecretManagerService() { + return new TestRMSecretManagerService(conf, rmContext); + } + }; + rm1.start(); + + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + SecurityUtilTestHelper.setTokenServiceUseIp(true); + RMApp app1 = rm1.submitApp(200); + RMAppAttempt attempt = app1.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + + // fetching am container will fail, keep retrying 5 times. + while (numRetries <= 5) { + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + Assert.assertEquals(RMAppAttemptState.SCHEDULED, + attempt.getAppAttemptState()); + System.out.println("Waiting for am container to be allocated."); + } + + SecurityUtilTestHelper.setTokenServiceUseIp(false); + rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED); + MockRM.launchAndRegisterAM(app1, rm1, nm1); + } } \ No newline at end of file