From 15808717f37b7b30d1de2e4ba774cad420c22173 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Wed, 18 May 2016 18:46:00 -0700 Subject: [PATCH] YARN-5110. Fix OpportunisticContainerAllocator to insert complete HostAddress in issued ContainerTokenIds. (Konstantinos Karanasos via asuresh) (cherry picked from commit 1597630681c784a3d59f5605b87e96197b8139d7) --- .../api/records/QueuedContainersStatus.java | 2 +- .../queuing/QueuingContainerManagerImpl.java | 1 + .../nodemanager/scheduler/LocalScheduler.java | 6 +- .../OpportunisticContainerAllocator.java | 17 ++--- .../scheduler/TestLocalScheduler.java | 9 ++- .../distributed/NodeQueueLoadMonitor.java | 14 +++-- .../hadoop/yarn/server/MiniYARNCluster.java | 62 ++++++++++++++++++- 7 files changed, 91 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java index a7f0eceb2ac..fb567d5f576 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java @@ -41,5 +41,5 @@ public static QueuedContainersStatus newInstance() { public abstract int getWaitQueueLength(); - public abstract void setWaitQueueLength(int queueWaitTime); + public abstract void setWaitQueueLength(int waitQueueLength); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 4f9d5a359c8..707051fbb6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -200,6 +200,7 @@ private void startAllocatedContainer( .getContainerTokenIdentifier().getContainerID(); this.context.getQueuingContext().getQueuedContainers().remove(containerId); try { + LOG.info("Starting container [" + containerId + "]"); super.startContainerInternal( allocatedContainerInfo.getContainerTokenIdentifier(), allocatedContainerInfo.getStartRequest()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index 42c1dcd1e5f..fca814b7d1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -337,8 +337,10 @@ private void addToNodeList(List nodes) { @Override public DistSchedAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { - LOG.info("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } // Partition requests into GUARANTEED and OPPORTUNISTIC reqs PartitionedResourceRequests partitionedAsks = partitionAskList(request .getAskList()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index 03ba61d28ec..e33c389b732 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -84,14 +84,14 @@ public Map> allocate(DistSchedulerParams appParams, Map allNodes, String userName) throws YarnException { Map> containers = new HashMap<>(); Set nodesAllocated = new HashSet<>(); - int numAsks = resourceAsks.size(); for (ResourceRequest anyAsk : resourceAsks) { allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, allNodes, userName, containers, nodesAllocated, anyAsk); - } - if (numAsks > 0) { - LOG.info("Opportunistic allocation requested for: " + numAsks - + " containers; allocated = " + containers.size()); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); } return containers; } @@ -129,8 +129,9 @@ private void allocateOpportunisticContainers(DistSchedulerParams appParams, } cList.add(container); numAllocated++; - LOG.info("Allocated " + numAllocated + " opportunistic containers."); + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); } private Container buildContainer(DistSchedulerParams appParams, @@ -146,8 +147,8 @@ private Container buildContainer(DistSchedulerParams appParams, long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( - cId, nodeId.getHost(), userName, capability, - currTime + appParams.containerTokenExpiryInterval, + cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, + capability, currTime + appParams.containerTokenExpiryInterval, context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index efc682a5bc7..e987e79e303 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -45,6 +46,7 @@ .NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security .NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -196,9 +198,14 @@ private DistSchedAllocateResponse createAllocateResponse(List nodes) { } private Map> mapAllocs(AllocateResponse - allocateResponse) { + allocateResponse) throws Exception { Map> allocs = new HashMap<>(); for (Container c : allocateResponse.getAllocatedContainers()) { + ContainerTokenIdentifier cTokId = BuilderUtils + .newContainerTokenIdentifier(c.getContainerToken()); + Assert.assertEquals( + c.getNodeId().getHost() + ":" + c.getNodeId().getPort(), + cTokId.getNmHostAddress()); List cIds = allocs.get(c.getNodeId()); if (cIds == null) { cIds = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 21f4f6e16ad..017a256e043 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -195,11 +195,11 @@ public void updateNode(RMNode rmNode) { new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } @@ -210,12 +210,14 @@ public void updateNode(RMNode rmNode) { .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) .updateTimestamp(); - LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } } else { this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + currentNode.queueWaitTime + "] and " + "wait queue length [" + currentNode.queueLength + "]"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 2372ea271a5..de4e22d6b79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; @@ -713,8 +716,14 @@ protected ContainerManagerImpl createContainerManager(Context context, ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { - return new CustomContainerManagerImpl(context, exec, del, - nodeStatusUpdater, metrics, dirsHandler); + if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, + YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) { + return new CustomQueueingContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } else { + return new CustomContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } } } @@ -846,6 +855,55 @@ protected void createAMRMProxyService(Configuration conf) { } } + private class CustomQueueingContainerManagerImpl extends + QueuingContainerManagerImpl { + + public CustomQueueingContainerManagerImpl(Context context, + ContainerExecutor exec, DeletionService del, NodeStatusUpdater + nodeStatusUpdater, NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler) { + super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); + } + + @Override + protected ContainersMonitor createContainersMonitor(ContainerExecutor + exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context) { + + @Override + public void increaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public void decreaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public boolean hasResourcesAvailable( + ContainersMonitorImpl.ProcessTreeInfo pti) { + return true; + } + }; + } + + @Override + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (this.amrmProxyEnabled) { + LOG.info("CustomAMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + AMRMProxyService amrmProxyService = + useRpc ? new AMRMProxyService(getContext(), dispatcher) + : new ShortCircuitedAMRMProxy(getContext(), dispatcher); + this.setAMRMProxyService(amrmProxyService); + addService(this.getAMRMProxyService()); + } else { + LOG.info("CustomAMRMProxyService is disabled"); + } + } + } + private class ShortCircuitedAMRMProxy extends AMRMProxyService { public ShortCircuitedAMRMProxy(Context context,