diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index b2ff7cdb455..ace001aca84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -238,6 +238,11 @@ public class ContainerScheduler extends AbstractService implements return this.queuedOpportunisticContainers.size(); } + @VisibleForTesting + public int getNumRunningContainers() { + return this.runningContainers.size(); + } + public OpportunisticContainersStatus getOpportunisticContainersStatus() { this.opportunisticContainersStatus.setQueuedOpportContainers( getNumQueuedOpportunisticContainers()); @@ -274,27 +279,32 @@ public class ContainerScheduler extends AbstractService implements ExecutionType.OPPORTUNISTIC) { this.metrics.completeOpportunisticContainer(container.getResource()); } - startPendingContainers(); + startPendingContainers(false); } } - private void startPendingContainers() { + /** + * Start pending containers in the queue. + * @param forceStartGuaranteedContaieners When this is true, start guaranteed + * container without looking at available resource + */ + private void startPendingContainers(boolean forceStartGuaranteedContaieners) { // Start pending guaranteed containers, if resources available. - boolean resourcesAvailable = - startContainersFromQueue(queuedGuaranteedContainers.values()); + boolean resourcesAvailable = startContainers( + queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); // Start opportunistic containers, if resources available. if (resourcesAvailable) { - startContainersFromQueue(queuedOpportunisticContainers.values()); + startContainers(queuedOpportunisticContainers.values(), false); } } - private boolean startContainersFromQueue( - Collection queuedContainers) { - Iterator cIter = queuedContainers.iterator(); + private boolean startContainers( + Collection containersToBeStarted, boolean force) { + Iterator cIter = containersToBeStarted.iterator(); boolean resourcesAvailable = true; while (cIter.hasNext() && resourcesAvailable) { Container container = cIter.next(); - if (tryStartContainer(container)) { + if (tryStartContainer(container, force)) { cIter.remove(); } else { resourcesAvailable = false; @@ -303,9 +313,11 @@ public class ContainerScheduler extends AbstractService implements return resourcesAvailable; } - private boolean tryStartContainer(Container container) { + private boolean tryStartContainer(Container container, boolean force) { boolean containerStarted = false; - if (resourceAvailableToStartContainer(container)) { + // call startContainer without checking available resource when force==true + if (force || resourceAvailableToStartContainer( + container)) { startContainer(container); containerStarted = true; } @@ -373,7 +385,12 @@ public class ContainerScheduler extends AbstractService implements // enough number of opportunistic containers. if (isGuaranteedContainer) { enqueueContainer(container); - startPendingContainers(); + + // When opportunistic container not allowed (which is determined by + // max-queue length of pending opportunistic containers <= 0), start + // guaranteed containers without looking at available resources. + boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); + startPendingContainers(forceStartGuaranteedContainers); // if the guaranteed container is queued, we need to preempt opportunistic // containers for make room for it @@ -386,12 +403,12 @@ public class ContainerScheduler extends AbstractService implements // containers based on remaining resource available, then enqueue the // opportunistic container. If the container is enqueued, we do another // pass to try to start the newly enqueued opportunistic container. - startPendingContainers(); + startPendingContainers(false); boolean containerQueued = enqueueContainer(container); // container may not get queued because the max opportunistic container // queue length is reached. If so, there is no point doing another pass if (containerQueued) { - startPendingContainers(); + startPendingContainers(false); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 220914d05e1..bc166da406f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -147,7 +147,7 @@ public class TestContainerManager extends BaseContainerManagerTest { @Before public void setup() throws IOException { conf.setInt( - YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 0); super.setup(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java new file mode 100644 index 00000000000..c531cc8d201 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Make sure CantainerScheduler related changes are compatible to old behaviors + */ +public class TestContainerSchedulerBehaviorCompatibility + extends BaseContainerManagerTest { + public TestContainerSchedulerBehaviorCompatibility() + throws UnsupportedFileSystemException { + super(); + } + + @Before + public void setup() throws IOException { + conf.setInt(YarnConfiguration.NM_VCORES, 1); + conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + 0); + super.setup(); + } + + @Test + public void testForceStartGuaranteedContainersWhenOppContainerDisabled() + throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + containerLaunchContext.setCommands(Arrays.asList("echo")); + + List list = new ArrayList<>(); + + // Add a container start request with #vcores > available (1). + // This could happen when DefaultContainerCalculator configured because + // on the RM side it won't check vcores at all. + list.add(StartContainerRequest.newInstance(containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, BuilderUtils.newResource(2048, 4), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + StartContainersRequest allRequests = StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + ContainerScheduler cs = containerManager.getContainerScheduler(); + int nQueuedContainers = cs.getNumQueuedContainers(); + int nRunningContainers = cs.getNumRunningContainers(); + + // Wait at most 10 secs and we expect all containers finished. + int maxTry = 100; + int nTried = 1; + while (nQueuedContainers != 0 || nRunningContainers != 0) { + Thread.sleep(100); + nQueuedContainers = cs.getNumQueuedContainers(); + nRunningContainers = cs.getNumRunningContainers(); + nTried++; + if (nTried > maxTry) { + Assert.fail("Failed to get either number of queuing containers to 0 or " + + "number of running containers to 0, #queued=" + nQueuedContainers + + ", #running=" + nRunningContainers); + } + } + } +}