diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java index a7f0eceb2ac..fb567d5f576 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java @@ -41,5 +41,5 @@ public abstract class QueuedContainersStatus { public abstract int getWaitQueueLength(); - public abstract void setWaitQueueLength(int queueWaitTime); + public abstract void setWaitQueueLength(int waitQueueLength); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 4f9d5a359c8..707051fbb6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -200,6 +200,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { .getContainerTokenIdentifier().getContainerID(); this.context.getQueuingContext().getQueuedContainers().remove(containerId); try { + LOG.info("Starting container [" + containerId + "]"); super.startContainerInternal( allocatedContainerInfo.getContainerTokenIdentifier(), allocatedContainerInfo.getStartRequest()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index 42c1dcd1e5f..fca814b7d1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -337,8 +337,10 @@ public final class LocalScheduler extends AbstractRequestInterceptor { @Override public DistSchedAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { - LOG.info("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } // Partition requests into GUARANTEED and OPPORTUNISTIC reqs PartitionedResourceRequests partitionedAsks = partitionAskList(request .getAskList()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index 03ba61d28ec..e33c389b732 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -84,14 +84,14 @@ public class OpportunisticContainerAllocator { Map allNodes, String userName) throws YarnException { Map> containers = new HashMap<>(); Set nodesAllocated = new HashSet<>(); - int numAsks = resourceAsks.size(); for (ResourceRequest anyAsk : resourceAsks) { allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, allNodes, userName, containers, nodesAllocated, anyAsk); - } - if (numAsks > 0) { - LOG.info("Opportunistic allocation requested for: " + numAsks - + " containers; allocated = " + containers.size()); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); } return containers; } @@ -129,8 +129,9 @@ public class OpportunisticContainerAllocator { } cList.add(container); numAllocated++; - LOG.info("Allocated " + numAllocated + " opportunistic containers."); + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); } private Container buildContainer(DistSchedulerParams appParams, @@ -146,8 +147,8 @@ public class OpportunisticContainerAllocator { long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( - cId, nodeId.getHost(), userName, capability, - currTime + appParams.containerTokenExpiryInterval, + cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, + capability, currTime + appParams.containerTokenExpiryInterval, context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index efc682a5bc7..e987e79e303 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security .NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security .NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -196,9 +198,14 @@ public class TestLocalScheduler { } private Map> mapAllocs(AllocateResponse - allocateResponse) { + allocateResponse) throws Exception { Map> allocs = new HashMap<>(); for (Container c : allocateResponse.getAllocatedContainers()) { + ContainerTokenIdentifier cTokId = BuilderUtils + .newContainerTokenIdentifier(c.getContainerToken()); + Assert.assertEquals( + c.getNodeId().getHost() + ":" + c.getNodeId().getPort(), + cTokId.getNmHostAddress()); List cIds = allocs.get(c.getNodeId()); if (cIds == null) { cIds = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 21f4f6e16ad..017a256e043 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -195,11 +195,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } @@ -210,12 +210,14 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) .updateTimestamp(); - LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } } else { this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + currentNode.queueWaitTime + "] and " + "wait queue length [" + currentNode.queueLength + "]"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 2372ea271a5..de4e22d6b79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; @@ -713,8 +716,14 @@ public class MiniYARNCluster extends CompositeService { ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { - return new CustomContainerManagerImpl(context, exec, del, - nodeStatusUpdater, metrics, dirsHandler); + if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, + YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) { + return new CustomQueueingContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } else { + return new CustomContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } } } @@ -846,6 +855,55 @@ public class MiniYARNCluster extends CompositeService { } } + private class CustomQueueingContainerManagerImpl extends + QueuingContainerManagerImpl { + + public CustomQueueingContainerManagerImpl(Context context, + ContainerExecutor exec, DeletionService del, NodeStatusUpdater + nodeStatusUpdater, NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler) { + super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); + } + + @Override + protected ContainersMonitor createContainersMonitor(ContainerExecutor + exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context) { + + @Override + public void increaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public void decreaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public boolean hasResourcesAvailable( + ContainersMonitorImpl.ProcessTreeInfo pti) { + return true; + } + }; + } + + @Override + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (this.amrmProxyEnabled) { + LOG.info("CustomAMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + AMRMProxyService amrmProxyService = + useRpc ? new AMRMProxyService(getContext(), dispatcher) + : new ShortCircuitedAMRMProxy(getContext(), dispatcher); + this.setAMRMProxyService(amrmProxyService); + addService(this.getAMRMProxyService()); + } else { + LOG.info("CustomAMRMProxyService is disabled"); + } + } + } + private class ShortCircuitedAMRMProxy extends AMRMProxyService { public ShortCircuitedAMRMProxy(Context context,